YARN-3904. Refactor timelineservice.storage to add support to online and offline aggregation writers (Li Lu via sjlee)
This commit is contained in:
parent
561c746cbe
commit
102b56ee96
|
@ -508,13 +508,12 @@
|
|||
<!-- Ignore SQL_PREPARED_STATEMENT_GENERATED_FROM_NONCONSTANT_STRING warnings for Timeline Phoenix storage. -->
|
||||
<!-- Since we're using dynamic columns, we have to generate SQL statements dynamically -->
|
||||
<Match>
|
||||
<Class name="org.apache.hadoop.yarn.server.timelineservice.storage.PhoenixTimelineWriterImpl" />
|
||||
<Class name="org.apache.hadoop.yarn.server.timelineservice.storage.PhoenixOfflineAggregationWriterImpl" />
|
||||
<Or>
|
||||
<Method name="storeEntityVariableLengthFields" />
|
||||
<Method name="storeEvents" />
|
||||
<Method name="storeMetrics" />
|
||||
<Method name="write" />
|
||||
<Method name="writeAggregatedEntity" />
|
||||
</Or>
|
||||
<Bug pattern="SQL_PREPARED_STATEMENT_GENERATED_FROM_NONCONSTANT_STRING" />
|
||||
</Match>
|
||||
|
||||
<!-- Following fields are used in ErrorsAndWarningsBlock, which is not a part of analysis of findbugs -->
|
||||
|
|
|
@ -2230,6 +2230,16 @@ public class YarnConfiguration extends Configuration {
|
|||
public static final long DEFAULT_TIMELINE_DELEGATION_TOKEN_MAX_LIFETIME =
|
||||
7*24*60*60*1000; // 7 days
|
||||
|
||||
// Timeline service v2 offlien aggregation related keys
|
||||
public static final String TIMELINE_OFFLINE_AGGREGATION_PREFIX =
|
||||
YarnConfiguration.TIMELINE_SERVICE_PREFIX + "aggregation.offline.";
|
||||
public static final String PHOENIX_OFFLINE_STORAGE_CONN_STR
|
||||
= TIMELINE_OFFLINE_AGGREGATION_PREFIX
|
||||
+ "phoenix.connectionString";
|
||||
|
||||
public static final String PHOENIX_OFFLINE_STORAGE_CONN_STR_DEFAULT
|
||||
= "jdbc:phoenix:localhost:2181:/hbase";
|
||||
|
||||
// ///////////////////////////////
|
||||
// Shared Cache Configs
|
||||
// ///////////////////////////////
|
||||
|
|
|
@ -0,0 +1,66 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.timelineservice.storage;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.OfflineAggregationInfo;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* YARN timeline service v2 offline aggregation storage interface
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public abstract class OfflineAggregationWriter extends AbstractService {
|
||||
|
||||
/**
|
||||
* Construct the offline writer.
|
||||
*
|
||||
* @param name service name
|
||||
*/
|
||||
public OfflineAggregationWriter(String name) {
|
||||
super(name);
|
||||
}
|
||||
|
||||
/**
|
||||
* Persist aggregated timeline entities to the offline store based on which
|
||||
* track this entity is to be rolled up to. The tracks along which aggregations
|
||||
* are to be done are given by {@link OfflineAggregationInfo}.
|
||||
*
|
||||
* @param context a {@link TimelineCollectorContext} object that describes the
|
||||
* context information of the aggregated data. Depends on the
|
||||
* type of the aggregation, some fields of this context maybe
|
||||
* empty or null.
|
||||
* @param entities {@link TimelineEntities} to be persisted.
|
||||
* @param info an {@link OfflineAggregationInfo} object that describes the
|
||||
* detail of the aggregation. Current supported option is
|
||||
* {@link OfflineAggregationInfo#FLOW_AGGREGATION}.
|
||||
* @return a {@link TimelineWriteResponse} object.
|
||||
* @throws IOException
|
||||
*/
|
||||
abstract TimelineWriteResponse writeAggregatedEntity(
|
||||
TimelineCollectorContext context,
|
||||
TimelineEntities entities, OfflineAggregationInfo info) throws IOException;
|
||||
}
|
|
@ -0,0 +1,356 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.timelineservice.storage;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.OfflineAggregationInfo;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
|
||||
import org.apache.phoenix.util.PropertiesUtil;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.sql.Connection;
|
||||
import java.sql.DriverManager;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Offline aggregation Phoenix storage. This storage currently consists of two
|
||||
* aggregation tables, one for flow level aggregation and one for user level
|
||||
* aggregation.
|
||||
*
|
||||
* Example table record:
|
||||
*
|
||||
* <pre>
|
||||
* |---------------------------|
|
||||
* | Primary | Column Family|
|
||||
* | key | metrics |
|
||||
* |---------------------------|
|
||||
* | row_key | metricId1: |
|
||||
* | | metricValue1 |
|
||||
* | | @timestamp1 |
|
||||
* | | |
|
||||
* | | metriciD1: |
|
||||
* | | metricValue2 |
|
||||
* | | @timestamp2 |
|
||||
* | | |
|
||||
* | | metricId2: |
|
||||
* | | metricValue1 |
|
||||
* | | @timestamp2 |
|
||||
* | | |
|
||||
* | | |
|
||||
* | | |
|
||||
* | | |
|
||||
* | | |
|
||||
* | | |
|
||||
* | | |
|
||||
* | | |
|
||||
* | | |
|
||||
* | | |
|
||||
* | | |
|
||||
* | | |
|
||||
* | | |
|
||||
* | | |
|
||||
* |---------------------------|
|
||||
* </pre>
|
||||
*
|
||||
* For the flow aggregation table, the primary key contains user, cluster, and
|
||||
* flow id. For user aggregation table,the primary key is user.
|
||||
*
|
||||
* Metrics column family stores all aggregated metrics for each record.
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public class PhoenixOfflineAggregationWriterImpl
|
||||
extends OfflineAggregationWriter {
|
||||
|
||||
private static final Log LOG
|
||||
= LogFactory.getLog(PhoenixOfflineAggregationWriterImpl.class);
|
||||
private static final String PHOENIX_COL_FAMILY_PLACE_HOLDER
|
||||
= "timeline_cf_placeholder";
|
||||
|
||||
/** Default Phoenix JDBC driver name */
|
||||
private static final String DRIVER_CLASS_NAME
|
||||
= "org.apache.phoenix.jdbc.PhoenixDriver";
|
||||
|
||||
/** Default Phoenix timeline config column family */
|
||||
private static final String METRIC_COLUMN_FAMILY = "m.";
|
||||
/** Default Phoenix timeline info column family */
|
||||
private static final String INFO_COLUMN_FAMILY = "i.";
|
||||
/** Default separator for Phoenix storage */
|
||||
private static final String AGGREGATION_STORAGE_SEPARATOR = ";";
|
||||
|
||||
/** Connection string to the deployed Phoenix cluster */
|
||||
private String connString = null;
|
||||
private Properties connProperties = new Properties();
|
||||
|
||||
public PhoenixOfflineAggregationWriterImpl(Properties prop) {
|
||||
super(PhoenixOfflineAggregationWriterImpl.class.getName());
|
||||
connProperties = PropertiesUtil.deepCopy(prop);
|
||||
}
|
||||
|
||||
public PhoenixOfflineAggregationWriterImpl() {
|
||||
super(PhoenixOfflineAggregationWriterImpl.class.getName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void serviceInit(Configuration conf) throws Exception {
|
||||
Class.forName(DRIVER_CLASS_NAME);
|
||||
// so check it here and only read in the config if it's not overridden.
|
||||
connString =
|
||||
conf.get(YarnConfiguration.PHOENIX_OFFLINE_STORAGE_CONN_STR,
|
||||
YarnConfiguration.PHOENIX_OFFLINE_STORAGE_CONN_STR_DEFAULT);
|
||||
super.init(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimelineWriteResponse writeAggregatedEntity(
|
||||
TimelineCollectorContext context, TimelineEntities entities,
|
||||
OfflineAggregationInfo info) throws IOException {
|
||||
TimelineWriteResponse response = new TimelineWriteResponse();
|
||||
String sql = "UPSERT INTO " + info.getTableName()
|
||||
+ " (" + StringUtils.join(info.getPrimaryKeyList(), ",")
|
||||
+ ", created_time, modified_time, metric_names) "
|
||||
+ "VALUES ("
|
||||
+ StringUtils.repeat("?,", info.getPrimaryKeyList().length)
|
||||
+ "?, ?, ?)";
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("TimelineEntity write SQL: " + sql);
|
||||
}
|
||||
|
||||
try (Connection conn = getConnection();
|
||||
PreparedStatement ps = conn.prepareStatement(sql)) {
|
||||
for (TimelineEntity entity : entities.getEntities()) {
|
||||
HashMap<String, TimelineMetric> formattedMetrics = new HashMap<>();
|
||||
if (entity.getMetrics() != null) {
|
||||
for (TimelineMetric m : entity.getMetrics()) {
|
||||
formattedMetrics.put(m.getId(), m);
|
||||
}
|
||||
}
|
||||
int idx = info.setStringsForPrimaryKey(ps, context, null, 1);
|
||||
ps.setLong(idx++, entity.getCreatedTime());
|
||||
ps.setLong(idx++, entity.getModifiedTime());
|
||||
ps.setString(idx++, StringUtils.join(formattedMetrics.keySet().toArray(),
|
||||
AGGREGATION_STORAGE_SEPARATOR));
|
||||
ps.execute();
|
||||
|
||||
storeEntityVariableLengthFields(entity, formattedMetrics, context, conn,
|
||||
info);
|
||||
|
||||
conn.commit();
|
||||
}
|
||||
} catch (SQLException se) {
|
||||
LOG.error("Failed to add entity to Phoenix " + se.getMessage());
|
||||
throw new IOException(se);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Exception on getting connection: " + e.getMessage());
|
||||
throw new IOException(e);
|
||||
}
|
||||
return response;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create Phoenix tables for offline aggregation storage if the tables do not
|
||||
* exist.
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
public void createPhoenixTables() throws IOException {
|
||||
// Create tables if necessary
|
||||
try (Connection conn = getConnection();
|
||||
Statement stmt = conn.createStatement()) {
|
||||
// Table schema defined as in YARN-3817.
|
||||
String sql = "CREATE TABLE IF NOT EXISTS "
|
||||
+ OfflineAggregationInfo.FLOW_AGGREGATION_TABLE_NAME
|
||||
+ "(user VARCHAR NOT NULL, cluster VARCHAR NOT NULL, "
|
||||
+ "flow_name VARCHAR NOT NULL, "
|
||||
+ "created_time UNSIGNED_LONG, modified_time UNSIGNED_LONG, "
|
||||
+ METRIC_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " VARBINARY, "
|
||||
+ "metric_names VARCHAR, info_keys VARCHAR "
|
||||
+ "CONSTRAINT pk PRIMARY KEY("
|
||||
+ "user, cluster, flow_name))";
|
||||
stmt.executeUpdate(sql);
|
||||
sql = "CREATE TABLE IF NOT EXISTS "
|
||||
+ OfflineAggregationInfo.USER_AGGREGATION_TABLE_NAME
|
||||
+ "(user VARCHAR NOT NULL, cluster VARCHAR NOT NULL, "
|
||||
+ "created_time UNSIGNED_LONG, modified_time UNSIGNED_LONG, "
|
||||
+ METRIC_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " VARBINARY, "
|
||||
+ "metric_names VARCHAR, info_keys VARCHAR "
|
||||
+ "CONSTRAINT pk PRIMARY KEY(user, cluster))";
|
||||
stmt.executeUpdate(sql);
|
||||
conn.commit();
|
||||
} catch (SQLException se) {
|
||||
LOG.error("Failed in init data " + se.getLocalizedMessage());
|
||||
throw new IOException(se);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Utility functions
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
Connection getConnection() throws IOException {
|
||||
Connection conn;
|
||||
try {
|
||||
conn = DriverManager.getConnection(connString, connProperties);
|
||||
conn.setAutoCommit(false);
|
||||
} catch (SQLException se) {
|
||||
LOG.error("Failed to connect to phoenix server! "
|
||||
+ se.getLocalizedMessage());
|
||||
throw new IOException(se);
|
||||
}
|
||||
return conn;
|
||||
}
|
||||
|
||||
// WARNING: This method will permanently drop a table!
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
void dropTable(String tableName) throws Exception {
|
||||
try (Connection conn = getConnection();
|
||||
Statement stmt = conn.createStatement()) {
|
||||
String sql = "DROP TABLE " + tableName;
|
||||
stmt.executeUpdate(sql);
|
||||
} catch (SQLException se) {
|
||||
LOG.error("Failed in dropping entity table " + se.getLocalizedMessage());
|
||||
throw se;
|
||||
}
|
||||
}
|
||||
|
||||
private static class DynamicColumns<K> {
|
||||
static final String COLUMN_FAMILY_TYPE_BYTES = " VARBINARY";
|
||||
static final String COLUMN_FAMILY_TYPE_STRING = " VARCHAR";
|
||||
String columnFamilyPrefix;
|
||||
String type;
|
||||
Set<K> columns;
|
||||
|
||||
public DynamicColumns(String columnFamilyPrefix, String type,
|
||||
Set<K> keyValues) {
|
||||
this.columnFamilyPrefix = columnFamilyPrefix;
|
||||
this.columns = keyValues;
|
||||
this.type = type;
|
||||
}
|
||||
}
|
||||
|
||||
private static <K> StringBuilder appendColumnsSQL(
|
||||
StringBuilder colNames, DynamicColumns<K> cfInfo) {
|
||||
// Prepare the sql template by iterating through all keys
|
||||
for (K key : cfInfo.columns) {
|
||||
colNames.append(",").append(cfInfo.columnFamilyPrefix)
|
||||
.append(key.toString()).append(cfInfo.type);
|
||||
}
|
||||
return colNames;
|
||||
}
|
||||
|
||||
private static <K, V> int setValuesForColumnFamily(
|
||||
PreparedStatement ps, Map<K, V> keyValues, int startPos,
|
||||
boolean converToBytes) throws SQLException {
|
||||
int idx = startPos;
|
||||
for (Map.Entry<K, V> entry : keyValues.entrySet()) {
|
||||
V value = entry.getValue();
|
||||
if (value instanceof Collection) {
|
||||
ps.setString(idx++, StringUtils.join(
|
||||
(Collection) value, AGGREGATION_STORAGE_SEPARATOR));
|
||||
} else {
|
||||
if (converToBytes) {
|
||||
try {
|
||||
ps.setBytes(idx++, GenericObjectMapper.write(entry.getValue()));
|
||||
} catch (IOException ie) {
|
||||
LOG.error("Exception in converting values into bytes "
|
||||
+ ie.getMessage());
|
||||
throw new SQLException(ie);
|
||||
}
|
||||
} else {
|
||||
ps.setString(idx++, value.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
return idx;
|
||||
}
|
||||
|
||||
private static <K, V> int setBytesForColumnFamily(
|
||||
PreparedStatement ps, Map<K, V> keyValues, int startPos)
|
||||
throws SQLException {
|
||||
return setValuesForColumnFamily(ps, keyValues, startPos, true);
|
||||
}
|
||||
|
||||
private static <K, V> int setStringsForColumnFamily(
|
||||
PreparedStatement ps, Map<K, V> keyValues, int startPos)
|
||||
throws SQLException {
|
||||
return setValuesForColumnFamily(ps, keyValues, startPos, false);
|
||||
}
|
||||
|
||||
private static void storeEntityVariableLengthFields(TimelineEntity entity,
|
||||
Map<String, TimelineMetric> formattedMetrics,
|
||||
TimelineCollectorContext context, Connection conn,
|
||||
OfflineAggregationInfo aggregationInfo) throws SQLException {
|
||||
int numPlaceholders = 0;
|
||||
StringBuilder columnDefs = new StringBuilder(
|
||||
StringUtils.join(aggregationInfo.getPrimaryKeyList(), ","));
|
||||
if (formattedMetrics != null && formattedMetrics.size() > 0) {
|
||||
appendColumnsSQL(columnDefs, new DynamicColumns<>(
|
||||
METRIC_COLUMN_FAMILY, DynamicColumns.COLUMN_FAMILY_TYPE_BYTES,
|
||||
formattedMetrics.keySet()));
|
||||
numPlaceholders += formattedMetrics.keySet().size();
|
||||
}
|
||||
if (numPlaceholders == 0) {
|
||||
return;
|
||||
}
|
||||
StringBuilder placeholders = new StringBuilder();
|
||||
placeholders.append(
|
||||
StringUtils.repeat("?,", aggregationInfo.getPrimaryKeyList().length));
|
||||
// numPlaceholders >= 1 now
|
||||
placeholders.append("?")
|
||||
.append(StringUtils.repeat(",?", numPlaceholders - 1));
|
||||
String sqlVariableLengthFields = new StringBuilder("UPSERT INTO ")
|
||||
.append(aggregationInfo.getTableName()).append(" (").append(columnDefs)
|
||||
.append(") VALUES(").append(placeholders).append(")").toString();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("SQL statement for variable length fields: "
|
||||
+ sqlVariableLengthFields);
|
||||
}
|
||||
// Use try with resource statement for the prepared statement
|
||||
try (PreparedStatement psVariableLengthFields =
|
||||
conn.prepareStatement(sqlVariableLengthFields)) {
|
||||
int idx = aggregationInfo.setStringsForPrimaryKey(
|
||||
psVariableLengthFields, context, null, 1);
|
||||
if (formattedMetrics != null && formattedMetrics.size() > 0) {
|
||||
idx = setBytesForColumnFamily(
|
||||
psVariableLengthFields, formattedMetrics, idx);
|
||||
}
|
||||
psVariableLengthFields.execute();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,530 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.timelineservice.storage;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.sql.Connection;
|
||||
import java.sql.DriverManager;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public class PhoenixTimelineWriterImpl extends AbstractService
|
||||
implements TimelineWriter {
|
||||
|
||||
public static final String TIMELINE_SERVICE_PHOENIX_STORAGE_CONN_STR
|
||||
= YarnConfiguration.TIMELINE_SERVICE_PREFIX
|
||||
+ "writer.phoenix.connectionString";
|
||||
|
||||
public static final String TIMELINE_SERVICE_PHEONIX_STORAGE_CONN_STR_DEFAULT
|
||||
= "jdbc:phoenix:localhost:2181:/hbase";
|
||||
|
||||
private static final Log LOG
|
||||
= LogFactory.getLog(PhoenixTimelineWriterImpl.class);
|
||||
private static final String PHOENIX_COL_FAMILY_PLACE_HOLDER
|
||||
= "timeline_cf_placeholder";
|
||||
// These lists are not taking effects in table creations.
|
||||
private static final String[] PHOENIX_STORAGE_PK_LIST
|
||||
= {"cluster", "user", "flow_name", "flow_version", "flow_run", "app_id",
|
||||
"type", "entity_id"};
|
||||
private static final String[] TIMELINE_EVENT_EXTRA_PK_LIST =
|
||||
{"timestamp", "event_id"};
|
||||
private static final String[] TIMELINE_METRIC_EXTRA_PK_LIST =
|
||||
{"metric_id"};
|
||||
/** Default Phoenix JDBC driver name */
|
||||
private static final String DRIVER_CLASS_NAME
|
||||
= "org.apache.phoenix.jdbc.PhoenixDriver";
|
||||
|
||||
/** Default Phoenix timeline entity table name */
|
||||
@VisibleForTesting
|
||||
static final String ENTITY_TABLE_NAME = "timeline_entity";
|
||||
/** Default Phoenix event table name */
|
||||
@VisibleForTesting
|
||||
static final String EVENT_TABLE_NAME = "timeline_event";
|
||||
/** Default Phoenix metric table name */
|
||||
@VisibleForTesting
|
||||
static final String METRIC_TABLE_NAME = "metric_singledata";
|
||||
|
||||
/** Default Phoenix timeline config column family */
|
||||
private static final String CONFIG_COLUMN_FAMILY = "c.";
|
||||
/** Default Phoenix timeline info column family */
|
||||
private static final String INFO_COLUMN_FAMILY = "i.";
|
||||
/** Default Phoenix event info column family */
|
||||
private static final String EVENT_INFO_COLUMN_FAMILY = "ei.";
|
||||
/** Default Phoenix isRelatedTo column family */
|
||||
private static final String IS_RELATED_TO_FAMILY = "ir.";
|
||||
/** Default Phoenix relatesTo column family */
|
||||
private static final String RELATES_TO_FAMILY = "rt.";
|
||||
/** Default separator for Phoenix storage */
|
||||
private static final String PHOENIX_STORAGE_SEPARATOR = ";";
|
||||
|
||||
/** Connection string to the deployed Phoenix cluster */
|
||||
@VisibleForTesting
|
||||
String connString = null;
|
||||
@VisibleForTesting
|
||||
Properties connProperties = new Properties();
|
||||
|
||||
PhoenixTimelineWriterImpl() {
|
||||
super((PhoenixTimelineWriterImpl.class.getName()));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceInit(Configuration conf) throws Exception {
|
||||
// so check it here and only read in the config if it's not overridden.
|
||||
connString =
|
||||
conf.get(TIMELINE_SERVICE_PHOENIX_STORAGE_CONN_STR,
|
||||
TIMELINE_SERVICE_PHEONIX_STORAGE_CONN_STR_DEFAULT);
|
||||
createTables();
|
||||
super.init(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStop() throws Exception {
|
||||
super.serviceStop();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimelineWriteResponse write(String clusterId, String userId,
|
||||
String flowName, String flowVersion, long flowRunId, String appId,
|
||||
TimelineEntities entities) throws IOException {
|
||||
TimelineWriteResponse response = new TimelineWriteResponse();
|
||||
TimelineCollectorContext currContext = new TimelineCollectorContext(
|
||||
clusterId, userId, flowName, flowVersion, flowRunId, appId);
|
||||
String sql = "UPSERT INTO " + ENTITY_TABLE_NAME
|
||||
+ " (" + StringUtils.join(PHOENIX_STORAGE_PK_LIST, ",")
|
||||
+ ", creation_time, modified_time, configs) "
|
||||
+ "VALUES (" + StringUtils.repeat("?,", PHOENIX_STORAGE_PK_LIST.length)
|
||||
+ "?, ?, ?)";
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("TimelineEntity write SQL: " + sql);
|
||||
}
|
||||
|
||||
try (Connection conn = getConnection();
|
||||
PreparedStatement ps = conn.prepareStatement(sql)) {
|
||||
for (TimelineEntity entity : entities.getEntities()) {
|
||||
int idx = setStringsForPrimaryKey(ps, currContext, entity, 1);
|
||||
ps.setLong(idx++, entity.getCreatedTime());
|
||||
ps.setLong(idx++, entity.getModifiedTime());
|
||||
String configKeys = StringUtils.join(
|
||||
entity.getConfigs().keySet(), PHOENIX_STORAGE_SEPARATOR);
|
||||
ps.setString(idx++, configKeys);
|
||||
ps.execute();
|
||||
|
||||
storeEntityVariableLengthFields(entity, currContext, conn);
|
||||
storeEvents(entity, currContext, conn);
|
||||
storeMetrics(entity, currContext, conn);
|
||||
|
||||
conn.commit();
|
||||
}
|
||||
} catch (SQLException se) {
|
||||
LOG.error("Failed to add entity to Phoenix " + se.getMessage());
|
||||
throw new IOException(se);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Exception on getting connection: " + e.getMessage());
|
||||
throw new IOException(e);
|
||||
}
|
||||
return response;
|
||||
}
|
||||
|
||||
/**
|
||||
* Aggregates the entity information to the timeline store based on which
|
||||
* track this entity is to be rolled up to The tracks along which aggregations
|
||||
* are to be done are given by {@link TimelineAggregationTrack}
|
||||
*
|
||||
* Any errors occurring for individual write request objects will be reported
|
||||
* in the response.
|
||||
*
|
||||
* @param data
|
||||
* a {@link TimelineEntity} object
|
||||
* a {@link TimelineAggregationTrack} enum value
|
||||
* @return a {@link TimelineWriteResponse} object.
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public TimelineWriteResponse aggregate(TimelineEntity data,
|
||||
TimelineAggregationTrack track) throws IOException {
|
||||
return null;
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush() throws IOException {
|
||||
// currently no-op
|
||||
}
|
||||
|
||||
// Utility functions
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
Connection getConnection() throws IOException {
|
||||
Connection conn;
|
||||
try {
|
||||
Class.forName(DRIVER_CLASS_NAME);
|
||||
conn = DriverManager.getConnection(connString, connProperties);
|
||||
conn.setAutoCommit(false);
|
||||
} catch (SQLException se) {
|
||||
LOG.error("Failed to connect to phoenix server! "
|
||||
+ se.getLocalizedMessage());
|
||||
throw new IOException(se);
|
||||
} catch (ClassNotFoundException e) {
|
||||
LOG.error("Class not found! " + e.getLocalizedMessage());
|
||||
throw new IOException(e);
|
||||
}
|
||||
return conn;
|
||||
}
|
||||
|
||||
private void createTables() throws Exception {
|
||||
// Create tables if necessary
|
||||
try (Connection conn = getConnection();
|
||||
Statement stmt = conn.createStatement()) {
|
||||
// Table schema defined as in YARN-3134.
|
||||
String sql = "CREATE TABLE IF NOT EXISTS " + ENTITY_TABLE_NAME
|
||||
+ "(user VARCHAR NOT NULL, cluster VARCHAR NOT NULL, "
|
||||
+ "flow_name VARCHAR NOT NULL, flow_version VARCHAR NOT NULL, "
|
||||
+ "flow_run UNSIGNED_LONG NOT NULL, "
|
||||
+ "app_id VARCHAR NOT NULL, type VARCHAR NOT NULL, "
|
||||
+ "entity_id VARCHAR NOT NULL, "
|
||||
+ "creation_time UNSIGNED_LONG, modified_time UNSIGNED_LONG, "
|
||||
+ "configs VARCHAR, "
|
||||
+ CONFIG_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " VARCHAR, "
|
||||
+ INFO_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " VARBINARY, "
|
||||
+ IS_RELATED_TO_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " VARCHAR, "
|
||||
+ RELATES_TO_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " VARCHAR "
|
||||
+ "CONSTRAINT pk PRIMARY KEY("
|
||||
+ "user, cluster, flow_name, flow_version, flow_run DESC, app_id, "
|
||||
+ "type, entity_id))";
|
||||
stmt.executeUpdate(sql);
|
||||
sql = "CREATE TABLE IF NOT EXISTS " + EVENT_TABLE_NAME
|
||||
+ "(user VARCHAR NOT NULL, cluster VARCHAR NOT NULL, "
|
||||
+ "flow_name VARCHAR NOT NULL, flow_version VARCHAR NOT NULL, "
|
||||
+ "flow_run UNSIGNED_LONG NOT NULL, "
|
||||
+ "app_id VARCHAR NOT NULL, type VARCHAR NOT NULL, "
|
||||
+ "entity_id VARCHAR NOT NULL, "
|
||||
+ "timestamp UNSIGNED_LONG NOT NULL, event_id VARCHAR NOT NULL, "
|
||||
+ EVENT_INFO_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " VARBINARY "
|
||||
+ "CONSTRAINT pk PRIMARY KEY("
|
||||
+ "user, cluster, flow_name, flow_version, flow_run DESC, app_id, "
|
||||
+ "type, entity_id, timestamp DESC, event_id))";
|
||||
stmt.executeUpdate(sql);
|
||||
sql = "CREATE TABLE IF NOT EXISTS " + METRIC_TABLE_NAME
|
||||
+ "(user VARCHAR NOT NULL, cluster VARCHAR NOT NULL, "
|
||||
+ "flow_name VARCHAR NOT NULL, flow_version VARCHAR NOT NULL, "
|
||||
+ "flow_run UNSIGNED_LONG NOT NULL, "
|
||||
+ "app_id VARCHAR NOT NULL, type VARCHAR NOT NULL, "
|
||||
+ "entity_id VARCHAR NOT NULL, "
|
||||
+ "metric_id VARCHAR NOT NULL, "
|
||||
+ "singledata VARBINARY, "
|
||||
+ "time UNSIGNED_LONG "
|
||||
+ "CONSTRAINT pk PRIMARY KEY("
|
||||
+ "user, cluster, flow_name, flow_version, flow_run DESC, app_id, "
|
||||
+ "type, entity_id, metric_id))";
|
||||
stmt.executeUpdate(sql);
|
||||
conn.commit();
|
||||
} catch (SQLException se) {
|
||||
LOG.error("Failed in init data " + se.getLocalizedMessage());
|
||||
throw se;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
private static class DynamicColumns<K> {
|
||||
static final String COLUMN_FAMILY_TYPE_BYTES = " VARBINARY";
|
||||
static final String COLUMN_FAMILY_TYPE_STRING = " VARCHAR";
|
||||
String columnFamilyPrefix;
|
||||
String type;
|
||||
Set<K> columns;
|
||||
|
||||
public DynamicColumns(String columnFamilyPrefix, String type,
|
||||
Set<K> keyValues) {
|
||||
this.columnFamilyPrefix = columnFamilyPrefix;
|
||||
this.columns = keyValues;
|
||||
this.type = type;
|
||||
}
|
||||
}
|
||||
|
||||
private static <K> StringBuilder appendColumnsSQL(
|
||||
StringBuilder colNames, DynamicColumns<K> cfInfo) {
|
||||
// Prepare the sql template by iterating through all keys
|
||||
for (K key : cfInfo.columns) {
|
||||
colNames.append(",").append(cfInfo.columnFamilyPrefix)
|
||||
.append(key.toString()).append(cfInfo.type);
|
||||
}
|
||||
return colNames;
|
||||
}
|
||||
|
||||
private static <K, V> int setValuesForColumnFamily(
|
||||
PreparedStatement ps, Map<K, V> keyValues, int startPos,
|
||||
boolean converToBytes) throws SQLException {
|
||||
int idx = startPos;
|
||||
for (Map.Entry<K, V> entry : keyValues.entrySet()) {
|
||||
V value = entry.getValue();
|
||||
if (value instanceof Collection) {
|
||||
ps.setString(idx++, StringUtils.join(
|
||||
(Collection) value, PHOENIX_STORAGE_SEPARATOR));
|
||||
} else {
|
||||
if (converToBytes) {
|
||||
try {
|
||||
ps.setBytes(idx++, GenericObjectMapper.write(entry.getValue()));
|
||||
} catch (IOException ie) {
|
||||
LOG.error("Exception in converting values into bytes "
|
||||
+ ie.getMessage());
|
||||
throw new SQLException(ie);
|
||||
}
|
||||
} else {
|
||||
ps.setString(idx++, value.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
return idx;
|
||||
}
|
||||
|
||||
private static <K, V> int setBytesForColumnFamily(
|
||||
PreparedStatement ps, Map<K, V> keyValues, int startPos)
|
||||
throws SQLException {
|
||||
return setValuesForColumnFamily(ps, keyValues, startPos, true);
|
||||
}
|
||||
|
||||
private static <K, V> int setStringsForColumnFamily(
|
||||
PreparedStatement ps, Map<K, V> keyValues, int startPos)
|
||||
throws SQLException {
|
||||
return setValuesForColumnFamily(ps, keyValues, startPos, false);
|
||||
}
|
||||
|
||||
private static int setStringsForPrimaryKey(PreparedStatement ps,
|
||||
TimelineCollectorContext context, TimelineEntity entity, int startPos)
|
||||
throws SQLException {
|
||||
int idx = startPos;
|
||||
ps.setString(idx++, context.getClusterId());
|
||||
ps.setString(idx++, context.getUserId());
|
||||
ps.setString(idx++,
|
||||
context.getFlowName());
|
||||
ps.setString(idx++, context.getFlowVersion());
|
||||
ps.setLong(idx++, context.getFlowRunId());
|
||||
ps.setString(idx++, context.getAppId());
|
||||
ps.setString(idx++, entity.getType());
|
||||
ps.setString(idx++, entity.getId());
|
||||
return idx;
|
||||
}
|
||||
|
||||
private static void storeEntityVariableLengthFields(TimelineEntity entity,
|
||||
TimelineCollectorContext context, Connection conn) throws SQLException {
|
||||
int numPlaceholders = 0;
|
||||
StringBuilder columnDefs = new StringBuilder(
|
||||
StringUtils.join(PHOENIX_STORAGE_PK_LIST, ","));
|
||||
if (entity.getConfigs() != null) {
|
||||
Set<String> keySet = entity.getConfigs().keySet();
|
||||
appendColumnsSQL(columnDefs, new DynamicColumns<>(
|
||||
CONFIG_COLUMN_FAMILY, DynamicColumns.COLUMN_FAMILY_TYPE_STRING,
|
||||
keySet));
|
||||
numPlaceholders += keySet.size();
|
||||
}
|
||||
if (entity.getInfo() != null) {
|
||||
Set<String> keySet = entity.getInfo().keySet();
|
||||
appendColumnsSQL(columnDefs, new DynamicColumns<>(
|
||||
INFO_COLUMN_FAMILY, DynamicColumns.COLUMN_FAMILY_TYPE_BYTES,
|
||||
keySet));
|
||||
numPlaceholders += keySet.size();
|
||||
}
|
||||
if (entity.getIsRelatedToEntities() != null) {
|
||||
Set<String> keySet = entity.getIsRelatedToEntities().keySet();
|
||||
appendColumnsSQL(columnDefs, new DynamicColumns<>(
|
||||
IS_RELATED_TO_FAMILY, DynamicColumns.COLUMN_FAMILY_TYPE_STRING,
|
||||
keySet));
|
||||
numPlaceholders += keySet.size();
|
||||
}
|
||||
if (entity.getRelatesToEntities() != null) {
|
||||
Set<String> keySet = entity.getRelatesToEntities().keySet();
|
||||
appendColumnsSQL(columnDefs, new DynamicColumns<>(
|
||||
RELATES_TO_FAMILY, DynamicColumns.COLUMN_FAMILY_TYPE_STRING,
|
||||
keySet));
|
||||
numPlaceholders += keySet.size();
|
||||
}
|
||||
if (numPlaceholders == 0) {
|
||||
return;
|
||||
}
|
||||
StringBuilder placeholders = new StringBuilder();
|
||||
placeholders.append(
|
||||
StringUtils.repeat("?,", PHOENIX_STORAGE_PK_LIST.length));
|
||||
// numPlaceholders >= 1 now
|
||||
placeholders.append("?")
|
||||
.append(StringUtils.repeat(",?", numPlaceholders - 1));
|
||||
String sqlVariableLengthFields = new StringBuilder("UPSERT INTO ")
|
||||
.append(ENTITY_TABLE_NAME).append(" (").append(columnDefs)
|
||||
.append(") VALUES(").append(placeholders).append(")").toString();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("SQL statement for variable length fields: "
|
||||
+ sqlVariableLengthFields);
|
||||
}
|
||||
// Use try with resource statement for the prepared statement
|
||||
try (PreparedStatement psVariableLengthFields =
|
||||
conn.prepareStatement(sqlVariableLengthFields)) {
|
||||
int idx = setStringsForPrimaryKey(
|
||||
psVariableLengthFields, context, entity, 1);
|
||||
if (entity.getConfigs() != null) {
|
||||
idx = setStringsForColumnFamily(
|
||||
psVariableLengthFields, entity.getConfigs(), idx);
|
||||
}
|
||||
if (entity.getInfo() != null) {
|
||||
idx = setBytesForColumnFamily(
|
||||
psVariableLengthFields, entity.getInfo(), idx);
|
||||
}
|
||||
if (entity.getIsRelatedToEntities() != null) {
|
||||
idx = setStringsForColumnFamily(
|
||||
psVariableLengthFields, entity.getIsRelatedToEntities(), idx);
|
||||
}
|
||||
if (entity.getRelatesToEntities() != null) {
|
||||
idx = setStringsForColumnFamily(
|
||||
psVariableLengthFields, entity.getRelatesToEntities(), idx);
|
||||
}
|
||||
psVariableLengthFields.execute();
|
||||
}
|
||||
}
|
||||
|
||||
private static void storeMetrics(TimelineEntity entity,
|
||||
TimelineCollectorContext context, Connection conn) throws SQLException {
|
||||
if (entity.getMetrics() == null) {
|
||||
return;
|
||||
}
|
||||
Set<TimelineMetric> metrics = entity.getMetrics();
|
||||
for (TimelineMetric metric : metrics) {
|
||||
StringBuilder sqlColumns = new StringBuilder(
|
||||
StringUtils.join(PHOENIX_STORAGE_PK_LIST, ","));
|
||||
sqlColumns.append(",")
|
||||
.append(StringUtils.join(TIMELINE_METRIC_EXTRA_PK_LIST, ","));
|
||||
sqlColumns.append(",").append("singledata, time");
|
||||
StringBuilder placeholders = new StringBuilder();
|
||||
placeholders.append(
|
||||
StringUtils.repeat("?,", PHOENIX_STORAGE_PK_LIST.length))
|
||||
.append(StringUtils.repeat("?,", TIMELINE_METRIC_EXTRA_PK_LIST.length));
|
||||
placeholders.append("?, ?");
|
||||
String sqlMetric = new StringBuilder("UPSERT INTO ")
|
||||
.append(METRIC_TABLE_NAME).append(" (").append(sqlColumns)
|
||||
.append(") VALUES(").append(placeholders).append(")").toString();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("SQL statement for metric: " + sqlMetric);
|
||||
}
|
||||
try (PreparedStatement psMetrics = conn.prepareStatement(sqlMetric)) {
|
||||
if (metric.getType().equals(TimelineMetric.Type.TIME_SERIES)) {
|
||||
LOG.warn("The incoming timeline metric contains time series data, "
|
||||
+ "which is currently not supported by Phoenix storage. "
|
||||
+ "Time series will be truncated. ");
|
||||
}
|
||||
int idx = setStringsForPrimaryKey(psMetrics, context, entity, 1);
|
||||
psMetrics.setString(idx++, metric.getId());
|
||||
Iterator<Map.Entry<Long, Number>> currNumIter =
|
||||
metric.getValues().entrySet().iterator();
|
||||
if (currNumIter.hasNext()) {
|
||||
// TODO: support time series storage
|
||||
Map.Entry<Long, Number> currEntry = currNumIter.next();
|
||||
psMetrics.setBytes(idx++,
|
||||
GenericObjectMapper.write(currEntry.getValue()));
|
||||
psMetrics.setLong(idx++, currEntry.getKey());
|
||||
} else {
|
||||
psMetrics.setBytes(idx++, GenericObjectMapper.write(null));
|
||||
LOG.warn("The incoming metric contains an empty value set. ");
|
||||
}
|
||||
psMetrics.execute();
|
||||
} catch (IOException ie) {
|
||||
LOG.error("Exception on converting single data to bytes: "
|
||||
+ ie.getMessage());
|
||||
throw new SQLException(ie);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void storeEvents(TimelineEntity entity,
|
||||
TimelineCollectorContext context, Connection conn) throws SQLException {
|
||||
if (entity.getEvents() == null) {
|
||||
return;
|
||||
}
|
||||
Set<TimelineEvent> events = entity.getEvents();
|
||||
for (TimelineEvent event : events) {
|
||||
// We need this number to check if the incoming event's info field is empty
|
||||
int numPlaceholders = 0;
|
||||
StringBuilder sqlColumns = new StringBuilder(
|
||||
StringUtils.join(PHOENIX_STORAGE_PK_LIST, ","));
|
||||
sqlColumns.append(",")
|
||||
.append(StringUtils.join(TIMELINE_EVENT_EXTRA_PK_LIST, ","));
|
||||
appendColumnsSQL(sqlColumns, new DynamicColumns<>(
|
||||
EVENT_INFO_COLUMN_FAMILY, DynamicColumns.COLUMN_FAMILY_TYPE_BYTES,
|
||||
event.getInfo().keySet()));
|
||||
numPlaceholders += event.getInfo().keySet().size();
|
||||
if (numPlaceholders == 0) {
|
||||
continue;
|
||||
}
|
||||
StringBuilder placeholders = new StringBuilder();
|
||||
placeholders.append(
|
||||
StringUtils.repeat("?,", PHOENIX_STORAGE_PK_LIST.length))
|
||||
.append(StringUtils.repeat("?,", TIMELINE_EVENT_EXTRA_PK_LIST.length));
|
||||
// numPlaceholders >= 1 now
|
||||
placeholders.append("?")
|
||||
.append(StringUtils.repeat(",?", numPlaceholders - 1));
|
||||
String sqlEvents = new StringBuilder("UPSERT INTO ")
|
||||
.append(EVENT_TABLE_NAME).append(" (").append(sqlColumns)
|
||||
.append(") VALUES(").append(placeholders).append(")").toString();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("SQL statement for events: " + sqlEvents);
|
||||
}
|
||||
try (PreparedStatement psEvent = conn.prepareStatement(sqlEvents)) {
|
||||
int idx = setStringsForPrimaryKey(psEvent, context, entity, 1);
|
||||
psEvent.setLong(idx++, event.getTimestamp());
|
||||
psEvent.setString(idx++, event.getId());
|
||||
setBytesForColumnFamily(psEvent, event.getInfo(), idx);
|
||||
psEvent.execute();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// WARNING: This method will permanently drop a table!
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
void dropTable(String tableName) throws Exception {
|
||||
try (Connection conn = getConnection();
|
||||
Statement stmt = conn.createStatement()) {
|
||||
String sql = "DROP TABLE " + tableName;
|
||||
stmt.executeUpdate(sql);
|
||||
} catch (SQLException se) {
|
||||
LOG.error("Failed in dropping entity table " + se.getLocalizedMessage());
|
||||
throw se;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -18,6 +18,8 @@
|
|||
package org.apache.hadoop.yarn.server.timelineservice.storage;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.cli.CommandLine;
|
||||
import org.apache.commons.cli.CommandLineParser;
|
||||
|
@ -51,6 +53,7 @@ public class TimelineSchemaCreator {
|
|||
|
||||
final static String NAME = TimelineSchemaCreator.class.getSimpleName();
|
||||
private static final Log LOG = LogFactory.getLog(TimelineSchemaCreator.class);
|
||||
private static final String PHOENIX_OPTION_SHORT = "p";
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
|
@ -83,7 +86,41 @@ public class TimelineSchemaCreator {
|
|||
hbaseConf.set(ApplicationTable.TABLE_NAME_CONF_NAME,
|
||||
applicationTableName);
|
||||
}
|
||||
createAllTables(hbaseConf);
|
||||
|
||||
List<Exception> exceptions = new ArrayList<>();
|
||||
try {
|
||||
createAllTables(hbaseConf);
|
||||
LOG.info("Successfully created HBase schema. ");
|
||||
} catch (IOException e) {
|
||||
LOG.error("Error in creating hbase tables: " + e.getMessage());
|
||||
exceptions.add(e);
|
||||
}
|
||||
|
||||
// Create Phoenix data schema if needed
|
||||
if (commandLine.hasOption(PHOENIX_OPTION_SHORT)) {
|
||||
Configuration phoenixConf = new Configuration();
|
||||
try {
|
||||
PhoenixOfflineAggregationWriterImpl phoenixWriter =
|
||||
new PhoenixOfflineAggregationWriterImpl();
|
||||
phoenixWriter.init(phoenixConf);
|
||||
phoenixWriter.start();
|
||||
phoenixWriter.createPhoenixTables();
|
||||
phoenixWriter.stop();
|
||||
LOG.info("Successfully created Phoenix offline aggregation schema. ");
|
||||
} catch (IOException e) {
|
||||
LOG.error("Error in creating phoenix tables: " + e.getMessage());
|
||||
exceptions.add(e);
|
||||
}
|
||||
}
|
||||
if (exceptions.size() > 0) {
|
||||
LOG.warn("Schema creation finished with the following exceptions");
|
||||
for (Exception e : exceptions) {
|
||||
LOG.warn(e.getMessage());
|
||||
}
|
||||
System.exit(-1);
|
||||
} else {
|
||||
LOG.info("Schema creation finished successfully");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -115,6 +152,12 @@ public class TimelineSchemaCreator {
|
|||
o.setRequired(false);
|
||||
options.addOption(o);
|
||||
|
||||
o = new Option(PHOENIX_OPTION_SHORT, "usePhoenix", false,
|
||||
"create Phoenix offline aggregation tables");
|
||||
// No need to set arg name since we do not need an argument here
|
||||
o.setRequired(false);
|
||||
options.addOption(o);
|
||||
|
||||
CommandLineParser parser = new PosixParser();
|
||||
CommandLine commandLine = null;
|
||||
try {
|
||||
|
|
|
@ -0,0 +1,110 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.timelineservice.storage.common;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
|
||||
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.SQLException;
|
||||
|
||||
/**
|
||||
* Class to carry the offline aggregation information for storage level
|
||||
* implementations. There are currently two predefined aggregation info
|
||||
* instances that represent flow and user level offline aggregations. Depend on
|
||||
* its implementation, a storage class may use an OfflineAggregationInfo object
|
||||
* to decide behaviors dynamically.
|
||||
*/
|
||||
public final class OfflineAggregationInfo {
|
||||
/**
|
||||
* Default flow level aggregation table name
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public static final String FLOW_AGGREGATION_TABLE_NAME
|
||||
= "yarn_timeline_flow_aggregation";
|
||||
/**
|
||||
* Default user level aggregation table name
|
||||
*/
|
||||
public static final String USER_AGGREGATION_TABLE_NAME
|
||||
= "yarn_timeline_user_aggregation";
|
||||
|
||||
// These lists are not taking effects in table creations.
|
||||
private static final String[] FLOW_AGGREGATION_PK_LIST =
|
||||
{ "user", "cluster", "flow_name" };
|
||||
private static final String[] USER_AGGREGATION_PK_LIST = { "user", "cluster"};
|
||||
|
||||
private final String tableName;
|
||||
private final String[] primaryKeyList;
|
||||
private final PrimaryKeyStringSetter primaryKeyStringSetter;
|
||||
|
||||
private OfflineAggregationInfo(String table, String[] pkList,
|
||||
PrimaryKeyStringSetter formatter) {
|
||||
tableName = table;
|
||||
primaryKeyList = pkList;
|
||||
primaryKeyStringSetter = formatter;
|
||||
}
|
||||
|
||||
private interface PrimaryKeyStringSetter {
|
||||
int setValues(PreparedStatement ps, TimelineCollectorContext context,
|
||||
String[] extraInfo, int startPos) throws SQLException;
|
||||
}
|
||||
|
||||
public String getTableName() {
|
||||
return tableName;
|
||||
}
|
||||
|
||||
public String[] getPrimaryKeyList() {
|
||||
return primaryKeyList.clone();
|
||||
}
|
||||
|
||||
public int setStringsForPrimaryKey(PreparedStatement ps,
|
||||
TimelineCollectorContext context, String[] extraInfo, int startPos)
|
||||
throws SQLException {
|
||||
return primaryKeyStringSetter.setValues(ps, context, extraInfo, startPos);
|
||||
}
|
||||
|
||||
public static final OfflineAggregationInfo FLOW_AGGREGATION =
|
||||
new OfflineAggregationInfo(FLOW_AGGREGATION_TABLE_NAME,
|
||||
FLOW_AGGREGATION_PK_LIST, new PrimaryKeyStringSetter() {
|
||||
@Override
|
||||
public int setValues(PreparedStatement ps,
|
||||
TimelineCollectorContext context, String[] extraInfo, int startPos)
|
||||
throws SQLException {
|
||||
int idx = startPos;
|
||||
ps.setString(idx++, context.getUserId());
|
||||
ps.setString(idx++, context.getClusterId());
|
||||
ps.setString(idx++, context.getFlowName());
|
||||
return idx;
|
||||
}
|
||||
});
|
||||
|
||||
public static final OfflineAggregationInfo USER_AGGREGATION =
|
||||
new OfflineAggregationInfo(USER_AGGREGATION_TABLE_NAME,
|
||||
USER_AGGREGATION_PK_LIST, new PrimaryKeyStringSetter() {
|
||||
@Override
|
||||
public int setValues(PreparedStatement ps,
|
||||
TimelineCollectorContext context, String[] extraInfo, int startPos)
|
||||
throws SQLException {
|
||||
int idx = startPos;
|
||||
ps.setString(idx++, context.getUserId());
|
||||
ps.setString(idx++, context.getClusterId());
|
||||
return idx;
|
||||
}
|
||||
});
|
||||
}
|
|
@ -25,14 +25,17 @@ import static org.junit.Assert.fail;
|
|||
|
||||
import org.apache.hadoop.hbase.IntegrationTestingUtility;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.OfflineAggregationInfo;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.apache.phoenix.hbase.index.write.IndexWriterUtils;
|
||||
import org.apache.phoenix.query.BaseTest;
|
||||
import org.apache.phoenix.query.QueryServices;
|
||||
import org.apache.phoenix.util.PropertiesUtil;
|
||||
import org.apache.phoenix.util.ReadOnlyProps;
|
||||
|
||||
import java.sql.ResultSet;
|
||||
|
@ -43,72 +46,36 @@ import java.util.Map;
|
|||
|
||||
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
|
||||
|
||||
public class TestPhoenixTimelineWriterImpl extends BaseTest {
|
||||
private static PhoenixTimelineWriterImpl writer;
|
||||
public class TestPhoenixOfflineAggregationWriterImpl extends BaseTest {
|
||||
private static PhoenixOfflineAggregationWriterImpl storage;
|
||||
private static final int BATCH_SIZE = 3;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() throws Exception {
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
writer = setupPhoenixClusterAndWriterForTest(conf);
|
||||
storage = setupPhoenixClusterAndWriterForTest(conf);
|
||||
}
|
||||
|
||||
@Test(timeout = 90000)
|
||||
public void testPhoenixWriterBasic() throws Exception {
|
||||
// Set up a list of timeline entities and write them back to Phoenix
|
||||
int numEntity = 12;
|
||||
TimelineEntities te =
|
||||
TestTimelineWriterImpl.getStandardTestTimelineEntities(numEntity);
|
||||
writer.write("cluster_1", "user1", "testFlow", "version1", 1l, "app_test_1", te);
|
||||
// Verify if we're storing all entities
|
||||
String sql = "SELECT COUNT(entity_id) FROM "
|
||||
+ PhoenixTimelineWriterImpl.ENTITY_TABLE_NAME;
|
||||
verifySQLWithCount(sql, numEntity, "Number of entities should be ");
|
||||
// Check config (half of all entities)
|
||||
sql = "SELECT COUNT(c.config) FROM "
|
||||
+ PhoenixTimelineWriterImpl.ENTITY_TABLE_NAME + "(c.config VARCHAR) ";
|
||||
verifySQLWithCount(sql, (numEntity / 2),
|
||||
"Number of entities with config should be ");
|
||||
// Check info (half of all entities)
|
||||
sql = "SELECT COUNT(i.info1) FROM "
|
||||
+ PhoenixTimelineWriterImpl.ENTITY_TABLE_NAME + "(i.info1 VARBINARY) ";
|
||||
verifySQLWithCount(sql, (numEntity / 2),
|
||||
"Number of entities with info should be ");
|
||||
// Check config and info (a quarter of all entities)
|
||||
sql = "SELECT COUNT(entity_id) FROM "
|
||||
+ PhoenixTimelineWriterImpl.ENTITY_TABLE_NAME
|
||||
+ "(c.config VARCHAR, i.info1 VARBINARY) "
|
||||
+ "WHERE c.config IS NOT NULL AND i.info1 IS NOT NULL";
|
||||
verifySQLWithCount(sql, (numEntity / 4),
|
||||
"Number of entities with both config and info should be ");
|
||||
// Check relatesToEntities and isRelatedToEntities
|
||||
sql = "SELECT COUNT(entity_id) FROM "
|
||||
+ PhoenixTimelineWriterImpl.ENTITY_TABLE_NAME
|
||||
+ "(rt.testType VARCHAR, ir.testType VARCHAR) "
|
||||
+ "WHERE rt.testType IS NOT NULL AND ir.testType IS NOT NULL";
|
||||
verifySQLWithCount(sql, numEntity - 2,
|
||||
"Number of entities with both relatesTo and isRelatedTo should be ");
|
||||
// Check event
|
||||
sql = "SELECT COUNT(entity_id) FROM "
|
||||
+ PhoenixTimelineWriterImpl.EVENT_TABLE_NAME;
|
||||
verifySQLWithCount(sql, (numEntity / 4), "Number of events should be ");
|
||||
// Check metrics
|
||||
sql = "SELECT COUNT(entity_id) FROM "
|
||||
+ PhoenixTimelineWriterImpl.METRIC_TABLE_NAME;
|
||||
verifySQLWithCount(sql, (numEntity / 4), "Number of events should be ");
|
||||
public void testFlowLevelAggregationStorage() throws Exception {
|
||||
testAggregator(OfflineAggregationInfo.FLOW_AGGREGATION);
|
||||
}
|
||||
|
||||
@Test(timeout = 90000)
|
||||
public void testUserLevelAggregationStorage() throws Exception {
|
||||
testAggregator(OfflineAggregationInfo.USER_AGGREGATION);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void cleanup() throws Exception {
|
||||
writer.dropTable(PhoenixTimelineWriterImpl.ENTITY_TABLE_NAME);
|
||||
writer.dropTable(PhoenixTimelineWriterImpl.EVENT_TABLE_NAME);
|
||||
writer.dropTable(PhoenixTimelineWriterImpl.METRIC_TABLE_NAME);
|
||||
writer.serviceStop();
|
||||
storage.dropTable(OfflineAggregationInfo.FLOW_AGGREGATION_TABLE_NAME);
|
||||
storage.dropTable(OfflineAggregationInfo.USER_AGGREGATION_TABLE_NAME);
|
||||
tearDownMiniCluster();
|
||||
}
|
||||
|
||||
private static PhoenixTimelineWriterImpl setupPhoenixClusterAndWriterForTest(
|
||||
YarnConfiguration conf) throws Exception{
|
||||
private static PhoenixOfflineAggregationWriterImpl
|
||||
setupPhoenixClusterAndWriterForTest(YarnConfiguration conf)
|
||||
throws Exception{
|
||||
Map<String, String> props = new HashMap<>();
|
||||
// Must update config before starting server
|
||||
props.put(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB,
|
||||
|
@ -125,21 +92,64 @@ public class TestPhoenixTimelineWriterImpl extends BaseTest {
|
|||
// Must update config before starting server
|
||||
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
|
||||
|
||||
PhoenixTimelineWriterImpl myWriter = new PhoenixTimelineWriterImpl();
|
||||
// Change connection settings for test
|
||||
conf.set(
|
||||
PhoenixTimelineWriterImpl.TIMELINE_SERVICE_PHOENIX_STORAGE_CONN_STR,
|
||||
YarnConfiguration.PHOENIX_OFFLINE_STORAGE_CONN_STR,
|
||||
getUrl());
|
||||
myWriter.connProperties = PropertiesUtil.deepCopy(TEST_PROPERTIES);
|
||||
myWriter.serviceInit(conf);
|
||||
PhoenixOfflineAggregationWriterImpl
|
||||
myWriter = new PhoenixOfflineAggregationWriterImpl(TEST_PROPERTIES);
|
||||
myWriter.init(conf);
|
||||
myWriter.start();
|
||||
myWriter.createPhoenixTables();
|
||||
return myWriter;
|
||||
}
|
||||
|
||||
private static TimelineEntity getTestAggregationTimelineEntity() {
|
||||
TimelineEntity entity = new TimelineEntity();
|
||||
String id = "hello1";
|
||||
String type = "testAggregationType";
|
||||
entity.setId(id);
|
||||
entity.setType(type);
|
||||
entity.setCreatedTime(1425016501000L);
|
||||
entity.setModifiedTime(1425016502000L);
|
||||
|
||||
TimelineMetric metric = new TimelineMetric();
|
||||
metric.setId("HDFS_BYTES_READ");
|
||||
metric.addValue(1425016501100L, 8000);
|
||||
entity.addMetric(metric);
|
||||
|
||||
return entity;
|
||||
}
|
||||
|
||||
private void testAggregator(OfflineAggregationInfo aggregationInfo)
|
||||
throws Exception {
|
||||
// Set up a list of timeline entities and write them back to Phoenix
|
||||
int numEntity = 1;
|
||||
TimelineEntities te = new TimelineEntities();
|
||||
te.addEntity(getTestAggregationTimelineEntity());
|
||||
TimelineCollectorContext context = new TimelineCollectorContext("cluster_1",
|
||||
"user1", "testFlow", null, 0, null);
|
||||
storage.writeAggregatedEntity(context, te,
|
||||
aggregationInfo);
|
||||
|
||||
// Verify if we're storing all entities
|
||||
String[] primaryKeyList = aggregationInfo.getPrimaryKeyList();
|
||||
String sql = "SELECT COUNT(" + primaryKeyList[primaryKeyList.length - 1]
|
||||
+") FROM " + aggregationInfo.getTableName();
|
||||
verifySQLWithCount(sql, numEntity, "Number of entities should be ");
|
||||
// Check metric
|
||||
sql = "SELECT COUNT(m.HDFS_BYTES_READ) FROM "
|
||||
+ aggregationInfo.getTableName() + "(m.HDFS_BYTES_READ VARBINARY) ";
|
||||
verifySQLWithCount(sql, numEntity,
|
||||
"Number of entities with info should be ");
|
||||
}
|
||||
|
||||
|
||||
private void verifySQLWithCount(String sql, int targetCount, String message)
|
||||
throws Exception {
|
||||
try (
|
||||
Statement stmt =
|
||||
writer.getConnection().createStatement();
|
||||
storage.getConnection().createStatement();
|
||||
ResultSet rs = stmt.executeQuery(sql)) {
|
||||
assertTrue("Result set empty on statement " + sql, rs.next());
|
||||
assertNotNull("Fail to execute query " + sql, rs);
|
|
@ -1,74 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.timelineservice.storage;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
||||
|
||||
public class TestTimelineWriterImpl {
|
||||
static TimelineEntities getStandardTestTimelineEntities(int listSize) {
|
||||
TimelineEntities te = new TimelineEntities();
|
||||
for (int i = 0; i < listSize; i++) {
|
||||
TimelineEntity entity = new TimelineEntity();
|
||||
String id = "hello" + i;
|
||||
String type = "testType";
|
||||
entity.setId(id);
|
||||
entity.setType(type);
|
||||
entity.setCreatedTime(1425016501000L + i);
|
||||
entity.setModifiedTime(1425016502000L + i);
|
||||
if (i > 0) {
|
||||
entity.addRelatesToEntity(type, "hello" + i);
|
||||
entity.addRelatesToEntity(type, "hello" + (i - 1));
|
||||
}
|
||||
if (i < listSize - 1) {
|
||||
entity.addIsRelatedToEntity(type, "hello" + i);
|
||||
entity.addIsRelatedToEntity(type, "hello" + (i + 1));
|
||||
}
|
||||
int category = i % 4;
|
||||
switch (category) {
|
||||
case 0:
|
||||
entity.addConfig("config", "config" + i);
|
||||
// Fall through deliberately
|
||||
case 1:
|
||||
entity.addInfo("info1", new Integer(i));
|
||||
entity.addInfo("info2", "helloworld");
|
||||
// Fall through deliberately
|
||||
case 2:
|
||||
break;
|
||||
case 3:
|
||||
entity.addConfig("config", "config" + i);
|
||||
TimelineEvent event = new TimelineEvent();
|
||||
event.setId("test event");
|
||||
event.setTimestamp(1425016501100L + i);
|
||||
event.addInfo("test_info", "content for " + entity.getId());
|
||||
event.addInfo("test_info1", new Integer(i));
|
||||
entity.addEvent(event);
|
||||
TimelineMetric metric = new TimelineMetric();
|
||||
metric.setId("HDFS_BYTES_READ");
|
||||
metric.addValue(1425016501100L + i, 8000 + i);
|
||||
entity.addMetric(metric);
|
||||
break;
|
||||
}
|
||||
te.addEntity(entity);
|
||||
}
|
||||
return te;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue