YARN-8499 ATSv2 Generalize TimelineStorageMonitor.
Contributed by Prabhu Joseph
This commit is contained in:
parent
c7554ffd5c
commit
cda9f33745
|
@ -34,8 +34,8 @@ import java.util.Set;
|
|||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_READER_STORAGE_MONITOR_INTERVAL_MS;
|
||||
import static org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl.DATA_TO_RETRIEVE;
|
||||
import static org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl.MONITOR_FILTERS;
|
||||
import static org.apache.hadoop.yarn.server.timelineservice.storage.HBaseStorageMonitor.DATA_TO_RETRIEVE;
|
||||
import static org.apache.hadoop.yarn.server.timelineservice.storage.HBaseStorageMonitor.MONITOR_FILTERS;
|
||||
|
||||
public class TestTimelineReaderHBaseDown {
|
||||
|
||||
|
|
|
@ -0,0 +1,90 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.timelineservice.storage;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReader;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReaderFactory;
|
||||
|
||||
/**
|
||||
* HBase based implementation for {@link TimelineStorageMonitor}.
|
||||
*/
|
||||
public class HBaseStorageMonitor extends TimelineStorageMonitor {
|
||||
|
||||
protected static final TimelineEntityFilters MONITOR_FILTERS =
|
||||
new TimelineEntityFilters.Builder().entityLimit(1L).build();
|
||||
protected static final TimelineDataToRetrieve DATA_TO_RETRIEVE =
|
||||
new TimelineDataToRetrieve(null, null, null, null, null, null);
|
||||
|
||||
private Configuration monitorHBaseConf;
|
||||
private Connection monitorConn;
|
||||
private TimelineEntityReader reader;
|
||||
|
||||
public HBaseStorageMonitor(Configuration conf) throws Exception {
|
||||
super(conf, Storage.HBase);
|
||||
this.initialize(conf);
|
||||
}
|
||||
|
||||
private void initialize(Configuration conf) throws Exception {
|
||||
monitorHBaseConf = HBaseTimelineStorageUtils.
|
||||
getTimelineServiceHBaseConf(conf);
|
||||
monitorHBaseConf.setInt("hbase.client.retries.number", 3);
|
||||
monitorHBaseConf.setLong("hbase.client.pause", 1000);
|
||||
long monitorInterval = conf.getLong(
|
||||
YarnConfiguration.TIMELINE_SERVICE_READER_STORAGE_MONITOR_INTERVAL_MS,
|
||||
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_STORAGE_MONITOR_INTERVAL_MS
|
||||
);
|
||||
monitorHBaseConf.setLong("hbase.rpc.timeout", monitorInterval);
|
||||
monitorHBaseConf.setLong("hbase.client.scanner.timeout.period",
|
||||
monitorInterval);
|
||||
monitorHBaseConf.setInt("zookeeper.recovery.retry", 1);
|
||||
monitorConn = ConnectionFactory.createConnection(monitorHBaseConf);
|
||||
|
||||
String clusterId = conf.get(YarnConfiguration.RM_CLUSTER_ID,
|
||||
YarnConfiguration.DEFAULT_RM_CLUSTER_ID);
|
||||
TimelineReaderContext monitorContext =
|
||||
new TimelineReaderContext(clusterId, null, null, null, null,
|
||||
TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), null, null);
|
||||
reader = TimelineEntityReaderFactory.createMultipleEntitiesReader(
|
||||
monitorContext, MONITOR_FILTERS, DATA_TO_RETRIEVE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void healthCheck() throws Exception {
|
||||
reader.readEntities(monitorHBaseConf, monitorConn);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
super.start();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() throws Exception {
|
||||
super.stop();
|
||||
monitorConn.close();
|
||||
}
|
||||
}
|
|
@ -20,10 +20,6 @@ package org.apache.hadoop.yarn.server.timelineservice.storage;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
|
@ -31,8 +27,6 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
|
|||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineHealth;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
|
||||
|
@ -54,12 +48,7 @@ public class HBaseTimelineReaderImpl
|
|||
|
||||
private Configuration hbaseConf = null;
|
||||
private Connection conn;
|
||||
private Configuration monitorHBaseConf = null;
|
||||
private Connection monitorConn;
|
||||
private ScheduledExecutorService monitorExecutorService;
|
||||
private TimelineReaderContext monitorContext;
|
||||
private long monitorInterval;
|
||||
private AtomicBoolean hbaseDown = new AtomicBoolean();
|
||||
private TimelineStorageMonitor storageMonitor;
|
||||
|
||||
public HBaseTimelineReaderImpl() {
|
||||
super(HBaseTimelineReaderImpl.class.getName());
|
||||
|
@ -68,39 +57,15 @@ public class HBaseTimelineReaderImpl
|
|||
@Override
|
||||
public void serviceInit(Configuration conf) throws Exception {
|
||||
super.serviceInit(conf);
|
||||
|
||||
String clusterId = conf.get(
|
||||
YarnConfiguration.RM_CLUSTER_ID,
|
||||
YarnConfiguration.DEFAULT_RM_CLUSTER_ID);
|
||||
monitorContext =
|
||||
new TimelineReaderContext(clusterId, null, null, null, null,
|
||||
TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), null, null);
|
||||
monitorInterval = conf.getLong(
|
||||
YarnConfiguration.TIMELINE_SERVICE_READER_STORAGE_MONITOR_INTERVAL_MS,
|
||||
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_STORAGE_MONITOR_INTERVAL_MS);
|
||||
|
||||
monitorHBaseConf = HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(conf);
|
||||
monitorHBaseConf.setInt("hbase.client.retries.number", 3);
|
||||
monitorHBaseConf.setLong("hbase.client.pause", 1000);
|
||||
monitorHBaseConf.setLong("hbase.rpc.timeout", monitorInterval);
|
||||
monitorHBaseConf.setLong("hbase.client.scanner.timeout.period",
|
||||
monitorInterval);
|
||||
monitorHBaseConf.setInt("zookeeper.recovery.retry", 1);
|
||||
monitorConn = ConnectionFactory.createConnection(monitorHBaseConf);
|
||||
|
||||
monitorExecutorService = Executors.newScheduledThreadPool(1);
|
||||
|
||||
hbaseConf = HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(conf);
|
||||
conn = ConnectionFactory.createConnection(hbaseConf);
|
||||
storageMonitor = new HBaseStorageMonitor(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStart() throws Exception {
|
||||
super.serviceStart();
|
||||
LOG.info("Scheduling HBase liveness monitor at interval {}",
|
||||
monitorInterval);
|
||||
monitorExecutorService.scheduleAtFixedRate(new HBaseMonitor(), 0,
|
||||
monitorInterval, TimeUnit.MILLISECONDS);
|
||||
storageMonitor.start();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -109,31 +74,18 @@ public class HBaseTimelineReaderImpl
|
|||
LOG.info("closing the hbase Connection");
|
||||
conn.close();
|
||||
}
|
||||
if (monitorExecutorService != null) {
|
||||
monitorExecutorService.shutdownNow();
|
||||
if (!monitorExecutorService.awaitTermination(30, TimeUnit.SECONDS)) {
|
||||
LOG.warn("failed to stop the monitir task in time. " +
|
||||
"will still proceed to close the monitor.");
|
||||
}
|
||||
}
|
||||
monitorConn.close();
|
||||
storageMonitor.stop();
|
||||
super.serviceStop();
|
||||
}
|
||||
|
||||
private void checkHBaseDown() throws IOException {
|
||||
if (hbaseDown.get()) {
|
||||
throw new IOException("HBase is down");
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isHBaseDown() {
|
||||
return hbaseDown.get();
|
||||
return storageMonitor.isStorageDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimelineEntity getEntity(TimelineReaderContext context,
|
||||
TimelineDataToRetrieve dataToRetrieve) throws IOException {
|
||||
checkHBaseDown();
|
||||
storageMonitor.checkStorageIsUp();
|
||||
TimelineEntityReader reader =
|
||||
TimelineEntityReaderFactory.createSingleEntityReader(context,
|
||||
dataToRetrieve);
|
||||
|
@ -144,7 +96,7 @@ public class HBaseTimelineReaderImpl
|
|||
public Set<TimelineEntity> getEntities(TimelineReaderContext context,
|
||||
TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve)
|
||||
throws IOException {
|
||||
checkHBaseDown();
|
||||
storageMonitor.checkStorageIsUp();
|
||||
TimelineEntityReader reader =
|
||||
TimelineEntityReaderFactory.createMultipleEntitiesReader(context,
|
||||
filters, dataToRetrieve);
|
||||
|
@ -154,7 +106,7 @@ public class HBaseTimelineReaderImpl
|
|||
@Override
|
||||
public Set<String> getEntityTypes(TimelineReaderContext context)
|
||||
throws IOException {
|
||||
checkHBaseDown();
|
||||
storageMonitor.checkStorageIsUp();
|
||||
EntityTypeReader reader = new EntityTypeReader(context);
|
||||
return reader.readEntityTypes(hbaseConf, conn);
|
||||
}
|
||||
|
@ -171,30 +123,4 @@ public class HBaseTimelineReaderImpl
|
|||
}
|
||||
}
|
||||
|
||||
protected static final TimelineEntityFilters MONITOR_FILTERS =
|
||||
new TimelineEntityFilters.Builder().entityLimit(1L).build();
|
||||
protected static final TimelineDataToRetrieve DATA_TO_RETRIEVE =
|
||||
new TimelineDataToRetrieve(null, null, null, null, null, null);
|
||||
|
||||
private class HBaseMonitor implements Runnable {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
LOG.debug("Running HBase liveness monitor");
|
||||
TimelineEntityReader reader =
|
||||
TimelineEntityReaderFactory.createMultipleEntitiesReader(
|
||||
monitorContext, MONITOR_FILTERS, DATA_TO_RETRIEVE);
|
||||
reader.readEntities(monitorHBaseConf, monitorConn);
|
||||
|
||||
// on success, reset hbase down flag
|
||||
if (hbaseDown.getAndSet(false)) {
|
||||
LOG.debug("HBase request succeeded, assuming HBase up");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Got failure attempting to read from timeline storage, " +
|
||||
"assuming HBase down", e);
|
||||
hbaseDown.getAndSet(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,106 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.timelineservice.storage;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
|
||||
/**
|
||||
* This abstract class is for monitoring Health of Timeline Storage.
|
||||
*/
|
||||
public abstract class TimelineStorageMonitor {
|
||||
private static final Logger LOG = LoggerFactory
|
||||
.getLogger(TimelineStorageMonitor.class);
|
||||
|
||||
/** Different Storages supported by ATSV2. */
|
||||
public enum Storage {
|
||||
HBase
|
||||
}
|
||||
|
||||
private ScheduledExecutorService monitorExecutorService;
|
||||
private long monitorInterval;
|
||||
private Storage storage;
|
||||
private AtomicBoolean storageDown = new AtomicBoolean();
|
||||
|
||||
public TimelineStorageMonitor(Configuration conf, Storage storage) {
|
||||
this.storage = storage;
|
||||
this.monitorInterval = conf.getLong(
|
||||
YarnConfiguration.TIMELINE_SERVICE_READER_STORAGE_MONITOR_INTERVAL_MS,
|
||||
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_STORAGE_MONITOR_INTERVAL_MS
|
||||
);
|
||||
}
|
||||
|
||||
public void start() {
|
||||
LOG.info("Scheduling {} storage monitor at interval {}",
|
||||
this.storage, monitorInterval);
|
||||
monitorExecutorService = Executors.newScheduledThreadPool(1);
|
||||
monitorExecutorService.scheduleAtFixedRate(new MonitorThread(), 0,
|
||||
monitorInterval, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
public void stop() throws Exception {
|
||||
if (monitorExecutorService != null) {
|
||||
monitorExecutorService.shutdownNow();
|
||||
if (!monitorExecutorService.awaitTermination(30, TimeUnit.SECONDS)) {
|
||||
LOG.warn("Failed to stop the monitor task in time. " +
|
||||
"will still proceed to close the monitor.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
abstract public void healthCheck() throws Exception;
|
||||
|
||||
public void checkStorageIsUp() throws IOException {
|
||||
if (storageDown.get()) {
|
||||
throw new IOException(storage + " is down");
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isStorageDown() {
|
||||
return storageDown.get();
|
||||
}
|
||||
|
||||
private class MonitorThread implements Runnable {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
LOG.debug("Running Timeline Storage monitor");
|
||||
healthCheck();
|
||||
if (storageDown.getAndSet(false)) {
|
||||
LOG.debug("{} health check succeeded, " +
|
||||
"assuming storage is up", storage);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn(String.format("Got failure attempting to read from %s, " +
|
||||
"assuming Storage is down", storage), e);
|
||||
storageDown.getAndSet(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue