diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineReaderHBaseDown.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineReaderHBaseDown.java index 786f529a7aa..e738d3971d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineReaderHBaseDown.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineReaderHBaseDown.java @@ -34,8 +34,8 @@ import java.util.concurrent.TimeoutException; import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_READER_STORAGE_MONITOR_INTERVAL_MS; -import static org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl.DATA_TO_RETRIEVE; -import static org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl.MONITOR_FILTERS; +import static org.apache.hadoop.yarn.server.timelineservice.storage.HBaseStorageMonitor.DATA_TO_RETRIEVE; +import static org.apache.hadoop.yarn.server.timelineservice.storage.HBaseStorageMonitor.MONITOR_FILTERS; public class TestTimelineReaderHBaseDown { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseStorageMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseStorageMonitor.java new file mode 100644 index 00000000000..c433aa66bec --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseStorageMonitor.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReader; +import org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReaderFactory; + +/** + * HBase based implementation for {@link TimelineStorageMonitor}. + */ +public class HBaseStorageMonitor extends TimelineStorageMonitor { + + protected static final TimelineEntityFilters MONITOR_FILTERS = + new TimelineEntityFilters.Builder().entityLimit(1L).build(); + protected static final TimelineDataToRetrieve DATA_TO_RETRIEVE = + new TimelineDataToRetrieve(null, null, null, null, null, null); + + private Configuration monitorHBaseConf; + private Connection monitorConn; + private TimelineEntityReader reader; + + public HBaseStorageMonitor(Configuration conf) throws Exception { + super(conf, Storage.HBase); + this.initialize(conf); + } + + private void initialize(Configuration conf) throws Exception { + monitorHBaseConf = HBaseTimelineStorageUtils. + getTimelineServiceHBaseConf(conf); + monitorHBaseConf.setInt("hbase.client.retries.number", 3); + monitorHBaseConf.setLong("hbase.client.pause", 1000); + long monitorInterval = conf.getLong( + YarnConfiguration.TIMELINE_SERVICE_READER_STORAGE_MONITOR_INTERVAL_MS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_STORAGE_MONITOR_INTERVAL_MS + ); + monitorHBaseConf.setLong("hbase.rpc.timeout", monitorInterval); + monitorHBaseConf.setLong("hbase.client.scanner.timeout.period", + monitorInterval); + monitorHBaseConf.setInt("zookeeper.recovery.retry", 1); + monitorConn = ConnectionFactory.createConnection(monitorHBaseConf); + + String clusterId = conf.get(YarnConfiguration.RM_CLUSTER_ID, + YarnConfiguration.DEFAULT_RM_CLUSTER_ID); + TimelineReaderContext monitorContext = + new TimelineReaderContext(clusterId, null, null, null, null, + TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), null, null); + reader = TimelineEntityReaderFactory.createMultipleEntitiesReader( + monitorContext, MONITOR_FILTERS, DATA_TO_RETRIEVE); + } + + @Override + public void healthCheck() throws Exception { + reader.readEntities(monitorHBaseConf, monitorConn); + } + + @Override + public void start() { + super.start(); + } + + @Override + public void stop() throws Exception { + super.stop(); + monitorConn.close(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java index 653126e1008..4c71fd6b49e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java @@ -20,10 +20,6 @@ import java.io.IOException; import java.util.Set; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Connection; @@ -31,8 +27,6 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.timeline.TimelineHealth; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; -import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; @@ -54,12 +48,7 @@ public class HBaseTimelineReaderImpl private Configuration hbaseConf = null; private Connection conn; - private Configuration monitorHBaseConf = null; - private Connection monitorConn; - private ScheduledExecutorService monitorExecutorService; - private TimelineReaderContext monitorContext; - private long monitorInterval; - private AtomicBoolean hbaseDown = new AtomicBoolean(); + private TimelineStorageMonitor storageMonitor; public HBaseTimelineReaderImpl() { super(HBaseTimelineReaderImpl.class.getName()); @@ -68,39 +57,15 @@ public HBaseTimelineReaderImpl() { @Override public void serviceInit(Configuration conf) throws Exception { super.serviceInit(conf); - - String clusterId = conf.get( - YarnConfiguration.RM_CLUSTER_ID, - YarnConfiguration.DEFAULT_RM_CLUSTER_ID); - monitorContext = - new TimelineReaderContext(clusterId, null, null, null, null, - TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), null, null); - monitorInterval = conf.getLong( - YarnConfiguration.TIMELINE_SERVICE_READER_STORAGE_MONITOR_INTERVAL_MS, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_STORAGE_MONITOR_INTERVAL_MS); - - monitorHBaseConf = HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(conf); - monitorHBaseConf.setInt("hbase.client.retries.number", 3); - monitorHBaseConf.setLong("hbase.client.pause", 1000); - monitorHBaseConf.setLong("hbase.rpc.timeout", monitorInterval); - monitorHBaseConf.setLong("hbase.client.scanner.timeout.period", - monitorInterval); - monitorHBaseConf.setInt("zookeeper.recovery.retry", 1); - monitorConn = ConnectionFactory.createConnection(monitorHBaseConf); - - monitorExecutorService = Executors.newScheduledThreadPool(1); - hbaseConf = HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(conf); conn = ConnectionFactory.createConnection(hbaseConf); + storageMonitor = new HBaseStorageMonitor(conf); } @Override protected void serviceStart() throws Exception { super.serviceStart(); - LOG.info("Scheduling HBase liveness monitor at interval {}", - monitorInterval); - monitorExecutorService.scheduleAtFixedRate(new HBaseMonitor(), 0, - monitorInterval, TimeUnit.MILLISECONDS); + storageMonitor.start(); } @Override @@ -109,31 +74,18 @@ protected void serviceStop() throws Exception { LOG.info("closing the hbase Connection"); conn.close(); } - if (monitorExecutorService != null) { - monitorExecutorService.shutdownNow(); - if (!monitorExecutorService.awaitTermination(30, TimeUnit.SECONDS)) { - LOG.warn("failed to stop the monitir task in time. " + - "will still proceed to close the monitor."); - } - } - monitorConn.close(); + storageMonitor.stop(); super.serviceStop(); } - private void checkHBaseDown() throws IOException { - if (hbaseDown.get()) { - throw new IOException("HBase is down"); - } - } - public boolean isHBaseDown() { - return hbaseDown.get(); + return storageMonitor.isStorageDown(); } @Override public TimelineEntity getEntity(TimelineReaderContext context, TimelineDataToRetrieve dataToRetrieve) throws IOException { - checkHBaseDown(); + storageMonitor.checkStorageIsUp(); TimelineEntityReader reader = TimelineEntityReaderFactory.createSingleEntityReader(context, dataToRetrieve); @@ -144,7 +96,7 @@ public TimelineEntity getEntity(TimelineReaderContext context, public Set getEntities(TimelineReaderContext context, TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve) throws IOException { - checkHBaseDown(); + storageMonitor.checkStorageIsUp(); TimelineEntityReader reader = TimelineEntityReaderFactory.createMultipleEntitiesReader(context, filters, dataToRetrieve); @@ -154,7 +106,7 @@ public Set getEntities(TimelineReaderContext context, @Override public Set getEntityTypes(TimelineReaderContext context) throws IOException { - checkHBaseDown(); + storageMonitor.checkStorageIsUp(); EntityTypeReader reader = new EntityTypeReader(context); return reader.readEntityTypes(hbaseConf, conn); } @@ -171,30 +123,4 @@ public TimelineHealth getHealthStatus() { } } - protected static final TimelineEntityFilters MONITOR_FILTERS = - new TimelineEntityFilters.Builder().entityLimit(1L).build(); - protected static final TimelineDataToRetrieve DATA_TO_RETRIEVE = - new TimelineDataToRetrieve(null, null, null, null, null, null); - - private class HBaseMonitor implements Runnable { - @Override - public void run() { - try { - LOG.debug("Running HBase liveness monitor"); - TimelineEntityReader reader = - TimelineEntityReaderFactory.createMultipleEntitiesReader( - monitorContext, MONITOR_FILTERS, DATA_TO_RETRIEVE); - reader.readEntities(monitorHBaseConf, monitorConn); - - // on success, reset hbase down flag - if (hbaseDown.getAndSet(false)) { - LOG.debug("HBase request succeeded, assuming HBase up"); - } - } catch (Exception e) { - LOG.warn("Got failure attempting to read from timeline storage, " + - "assuming HBase down", e); - hbaseDown.getAndSet(true); - } - } - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineStorageMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineStorageMonitor.java new file mode 100644 index 00000000000..fc96f19d75e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineStorageMonitor.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +/** + * This abstract class is for monitoring Health of Timeline Storage. + */ +public abstract class TimelineStorageMonitor { + private static final Logger LOG = LoggerFactory + .getLogger(TimelineStorageMonitor.class); + + /** Different Storages supported by ATSV2. */ + public enum Storage { + HBase + } + + private ScheduledExecutorService monitorExecutorService; + private long monitorInterval; + private Storage storage; + private AtomicBoolean storageDown = new AtomicBoolean(); + + public TimelineStorageMonitor(Configuration conf, Storage storage) { + this.storage = storage; + this.monitorInterval = conf.getLong( + YarnConfiguration.TIMELINE_SERVICE_READER_STORAGE_MONITOR_INTERVAL_MS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_STORAGE_MONITOR_INTERVAL_MS + ); + } + + public void start() { + LOG.info("Scheduling {} storage monitor at interval {}", + this.storage, monitorInterval); + monitorExecutorService = Executors.newScheduledThreadPool(1); + monitorExecutorService.scheduleAtFixedRate(new MonitorThread(), 0, + monitorInterval, TimeUnit.MILLISECONDS); + } + + public void stop() throws Exception { + if (monitorExecutorService != null) { + monitorExecutorService.shutdownNow(); + if (!monitorExecutorService.awaitTermination(30, TimeUnit.SECONDS)) { + LOG.warn("Failed to stop the monitor task in time. " + + "will still proceed to close the monitor."); + } + } + } + + abstract public void healthCheck() throws Exception; + + public void checkStorageIsUp() throws IOException { + if (storageDown.get()) { + throw new IOException(storage + " is down"); + } + } + + public boolean isStorageDown() { + return storageDown.get(); + } + + private class MonitorThread implements Runnable { + @Override + public void run() { + try { + LOG.debug("Running Timeline Storage monitor"); + healthCheck(); + if (storageDown.getAndSet(false)) { + LOG.debug("{} health check succeeded, " + + "assuming storage is up", storage); + } + } catch (Exception e) { + LOG.warn(String.format("Got failure attempting to read from %s, " + + "assuming Storage is down", storage), e); + storageDown.getAndSet(true); + } + } + } + +}