YARN-8499 ATSv2 Generalize TimelineStorageMonitor.

Contributed by Prabhu Joseph
This commit is contained in:
Eric Yang 2019-06-14 18:59:14 -04:00 committed by Rohith Sharma K S
parent 6110af2d1d
commit 02779cdc3a
4 changed files with 206 additions and 86 deletions

View File

@ -34,8 +34,8 @@ import java.util.Set;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_READER_STORAGE_MONITOR_INTERVAL_MS; import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_READER_STORAGE_MONITOR_INTERVAL_MS;
import static org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl.DATA_TO_RETRIEVE; import static org.apache.hadoop.yarn.server.timelineservice.storage.HBaseStorageMonitor.DATA_TO_RETRIEVE;
import static org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl.MONITOR_FILTERS; import static org.apache.hadoop.yarn.server.timelineservice.storage.HBaseStorageMonitor.MONITOR_FILTERS;
public class TestTimelineReaderHBaseDown { public class TestTimelineReaderHBaseDown {

View File

@ -0,0 +1,90 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReader;
import org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReaderFactory;
/**
* HBase based implementation for {@link TimelineStorageMonitor}.
*/
public class HBaseStorageMonitor extends TimelineStorageMonitor {
protected static final TimelineEntityFilters MONITOR_FILTERS =
new TimelineEntityFilters.Builder().entityLimit(1L).build();
protected static final TimelineDataToRetrieve DATA_TO_RETRIEVE =
new TimelineDataToRetrieve(null, null, null, null, null, null);
private Configuration monitorHBaseConf;
private Connection monitorConn;
private TimelineEntityReader reader;
public HBaseStorageMonitor(Configuration conf) throws Exception {
super(conf, Storage.HBase);
this.initialize(conf);
}
private void initialize(Configuration conf) throws Exception {
monitorHBaseConf = HBaseTimelineStorageUtils.
getTimelineServiceHBaseConf(conf);
monitorHBaseConf.setInt("hbase.client.retries.number", 3);
monitorHBaseConf.setLong("hbase.client.pause", 1000);
long monitorInterval = conf.getLong(
YarnConfiguration.TIMELINE_SERVICE_READER_STORAGE_MONITOR_INTERVAL_MS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_STORAGE_MONITOR_INTERVAL_MS
);
monitorHBaseConf.setLong("hbase.rpc.timeout", monitorInterval);
monitorHBaseConf.setLong("hbase.client.scanner.timeout.period",
monitorInterval);
monitorHBaseConf.setInt("zookeeper.recovery.retry", 1);
monitorConn = ConnectionFactory.createConnection(monitorHBaseConf);
String clusterId = conf.get(YarnConfiguration.RM_CLUSTER_ID,
YarnConfiguration.DEFAULT_RM_CLUSTER_ID);
TimelineReaderContext monitorContext =
new TimelineReaderContext(clusterId, null, null, null, null,
TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), null, null);
reader = TimelineEntityReaderFactory.createMultipleEntitiesReader(
monitorContext, MONITOR_FILTERS, DATA_TO_RETRIEVE);
}
@Override
public void healthCheck() throws Exception {
reader.readEntities(monitorHBaseConf, monitorConn);
}
@Override
public void start() {
super.start();
}
@Override
public void stop() throws Exception {
super.stop();
monitorConn.close();
}
}

View File

@ -20,10 +20,6 @@ package org.apache.hadoop.yarn.server.timelineservice.storage;
import java.io.IOException; import java.io.IOException;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
@ -31,8 +27,6 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.timeline.TimelineHealth; import org.apache.hadoop.yarn.api.records.timeline.TimelineHealth;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
@ -54,12 +48,7 @@ public class HBaseTimelineReaderImpl
private Configuration hbaseConf = null; private Configuration hbaseConf = null;
private Connection conn; private Connection conn;
private Configuration monitorHBaseConf = null; private TimelineStorageMonitor storageMonitor;
private Connection monitorConn;
private ScheduledExecutorService monitorExecutorService;
private TimelineReaderContext monitorContext;
private long monitorInterval;
private AtomicBoolean hbaseDown = new AtomicBoolean();
public HBaseTimelineReaderImpl() { public HBaseTimelineReaderImpl() {
super(HBaseTimelineReaderImpl.class.getName()); super(HBaseTimelineReaderImpl.class.getName());
@ -68,39 +57,15 @@ public class HBaseTimelineReaderImpl
@Override @Override
public void serviceInit(Configuration conf) throws Exception { public void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf); super.serviceInit(conf);
String clusterId = conf.get(
YarnConfiguration.RM_CLUSTER_ID,
YarnConfiguration.DEFAULT_RM_CLUSTER_ID);
monitorContext =
new TimelineReaderContext(clusterId, null, null, null, null,
TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), null, null);
monitorInterval = conf.getLong(
YarnConfiguration.TIMELINE_SERVICE_READER_STORAGE_MONITOR_INTERVAL_MS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_STORAGE_MONITOR_INTERVAL_MS);
monitorHBaseConf = HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(conf);
monitorHBaseConf.setInt("hbase.client.retries.number", 3);
monitorHBaseConf.setLong("hbase.client.pause", 1000);
monitorHBaseConf.setLong("hbase.rpc.timeout", monitorInterval);
monitorHBaseConf.setLong("hbase.client.scanner.timeout.period",
monitorInterval);
monitorHBaseConf.setInt("zookeeper.recovery.retry", 1);
monitorConn = ConnectionFactory.createConnection(monitorHBaseConf);
monitorExecutorService = Executors.newScheduledThreadPool(1);
hbaseConf = HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(conf); hbaseConf = HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(conf);
conn = ConnectionFactory.createConnection(hbaseConf); conn = ConnectionFactory.createConnection(hbaseConf);
storageMonitor = new HBaseStorageMonitor(conf);
} }
@Override @Override
protected void serviceStart() throws Exception { protected void serviceStart() throws Exception {
super.serviceStart(); super.serviceStart();
LOG.info("Scheduling HBase liveness monitor at interval {}", storageMonitor.start();
monitorInterval);
monitorExecutorService.scheduleAtFixedRate(new HBaseMonitor(), 0,
monitorInterval, TimeUnit.MILLISECONDS);
} }
@Override @Override
@ -109,31 +74,18 @@ public class HBaseTimelineReaderImpl
LOG.info("closing the hbase Connection"); LOG.info("closing the hbase Connection");
conn.close(); conn.close();
} }
if (monitorExecutorService != null) { storageMonitor.stop();
monitorExecutorService.shutdownNow();
if (!monitorExecutorService.awaitTermination(30, TimeUnit.SECONDS)) {
LOG.warn("failed to stop the monitir task in time. " +
"will still proceed to close the monitor.");
}
}
monitorConn.close();
super.serviceStop(); super.serviceStop();
} }
private void checkHBaseDown() throws IOException {
if (hbaseDown.get()) {
throw new IOException("HBase is down");
}
}
public boolean isHBaseDown() { public boolean isHBaseDown() {
return hbaseDown.get(); return storageMonitor.isStorageDown();
} }
@Override @Override
public TimelineEntity getEntity(TimelineReaderContext context, public TimelineEntity getEntity(TimelineReaderContext context,
TimelineDataToRetrieve dataToRetrieve) throws IOException { TimelineDataToRetrieve dataToRetrieve) throws IOException {
checkHBaseDown(); storageMonitor.checkStorageIsUp();
TimelineEntityReader reader = TimelineEntityReader reader =
TimelineEntityReaderFactory.createSingleEntityReader(context, TimelineEntityReaderFactory.createSingleEntityReader(context,
dataToRetrieve); dataToRetrieve);
@ -144,7 +96,7 @@ public class HBaseTimelineReaderImpl
public Set<TimelineEntity> getEntities(TimelineReaderContext context, public Set<TimelineEntity> getEntities(TimelineReaderContext context,
TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve) TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve)
throws IOException { throws IOException {
checkHBaseDown(); storageMonitor.checkStorageIsUp();
TimelineEntityReader reader = TimelineEntityReader reader =
TimelineEntityReaderFactory.createMultipleEntitiesReader(context, TimelineEntityReaderFactory.createMultipleEntitiesReader(context,
filters, dataToRetrieve); filters, dataToRetrieve);
@ -154,7 +106,7 @@ public class HBaseTimelineReaderImpl
@Override @Override
public Set<String> getEntityTypes(TimelineReaderContext context) public Set<String> getEntityTypes(TimelineReaderContext context)
throws IOException { throws IOException {
checkHBaseDown(); storageMonitor.checkStorageIsUp();
EntityTypeReader reader = new EntityTypeReader(context); EntityTypeReader reader = new EntityTypeReader(context);
return reader.readEntityTypes(hbaseConf, conn); return reader.readEntityTypes(hbaseConf, conn);
} }
@ -171,32 +123,4 @@ public class HBaseTimelineReaderImpl
} }
} }
protected static final TimelineEntityFilters MONITOR_FILTERS =
new TimelineEntityFilters.Builder().entityLimit(1L).build();
protected static final TimelineDataToRetrieve DATA_TO_RETRIEVE =
new TimelineDataToRetrieve(null, null, null, null, null, null);
private class HBaseMonitor implements Runnable {
@Override
public void run() {
try {
LOG.info("Running HBase liveness monitor");
TimelineEntityReader reader =
TimelineEntityReaderFactory.createMultipleEntitiesReader(
monitorContext, MONITOR_FILTERS, DATA_TO_RETRIEVE);
reader.readEntities(monitorHBaseConf, monitorConn);
// on success, reset hbase down flag
if (hbaseDown.getAndSet(false)) {
if(LOG.isDebugEnabled()) {
LOG.debug("HBase request succeeded, assuming HBase up");
}
}
} catch (Exception e) {
LOG.warn("Got failure attempting to read from timeline storage, " +
"assuming HBase down", e);
hbaseDown.getAndSet(true);
}
}
}
} }

View File

@ -0,0 +1,106 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
/**
* This abstract class is for monitoring Health of Timeline Storage.
*/
public abstract class TimelineStorageMonitor {
private static final Logger LOG = LoggerFactory
.getLogger(TimelineStorageMonitor.class);
/** Different Storages supported by ATSV2. */
public enum Storage {
HBase
}
private ScheduledExecutorService monitorExecutorService;
private long monitorInterval;
private Storage storage;
private AtomicBoolean storageDown = new AtomicBoolean();
public TimelineStorageMonitor(Configuration conf, Storage storage) {
this.storage = storage;
this.monitorInterval = conf.getLong(
YarnConfiguration.TIMELINE_SERVICE_READER_STORAGE_MONITOR_INTERVAL_MS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_STORAGE_MONITOR_INTERVAL_MS
);
}
public void start() {
LOG.info("Scheduling {} storage monitor at interval {}",
this.storage, monitorInterval);
monitorExecutorService = Executors.newScheduledThreadPool(1);
monitorExecutorService.scheduleAtFixedRate(new MonitorThread(), 0,
monitorInterval, TimeUnit.MILLISECONDS);
}
public void stop() throws Exception {
if (monitorExecutorService != null) {
monitorExecutorService.shutdownNow();
if (!monitorExecutorService.awaitTermination(30, TimeUnit.SECONDS)) {
LOG.warn("Failed to stop the monitor task in time. " +
"will still proceed to close the monitor.");
}
}
}
abstract public void healthCheck() throws Exception;
public void checkStorageIsUp() throws IOException {
if (storageDown.get()) {
throw new IOException(storage + " is down");
}
}
public boolean isStorageDown() {
return storageDown.get();
}
private class MonitorThread implements Runnable {
@Override
public void run() {
try {
LOG.debug("Running Timeline Storage monitor");
healthCheck();
if (storageDown.getAndSet(false)) {
LOG.debug("{} health check succeeded, " +
"assuming storage is up", storage);
}
} catch (Exception e) {
LOG.warn(String.format("Got failure attempting to read from %s, " +
"assuming Storage is down", storage), e);
storageDown.getAndSet(true);
}
}
}
}