YARN-9374. Improve Timeline service resilience when HBase is unavailable.
Contributed by Prabhu Joseph and Szilard Nemeth
This commit is contained in:
parent
02779cdc3a
commit
b87a727ff4
|
@ -150,7 +150,14 @@ public class TestTimelineReaderHBaseDown {
|
||||||
waitForHBaseDown(htr);
|
waitForHBaseDown(htr);
|
||||||
|
|
||||||
util.startMiniHBaseCluster(1, 1);
|
util.startMiniHBaseCluster(1, 1);
|
||||||
GenericTestUtils.waitFor(() -> !htr.isHBaseDown(), 1000, 150000);
|
GenericTestUtils.waitFor(() -> {
|
||||||
|
try {
|
||||||
|
htr.getTimelineStorageMonitor().checkStorageIsUp();
|
||||||
|
return true;
|
||||||
|
} catch (IOException e) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}, 1000, 150000);
|
||||||
} finally {
|
} finally {
|
||||||
util.shutdownMiniCluster();
|
util.shutdownMiniCluster();
|
||||||
}
|
}
|
||||||
|
@ -158,8 +165,15 @@ public class TestTimelineReaderHBaseDown {
|
||||||
|
|
||||||
private static void waitForHBaseDown(HBaseTimelineReaderImpl htr) throws
|
private static void waitForHBaseDown(HBaseTimelineReaderImpl htr) throws
|
||||||
TimeoutException, InterruptedException {
|
TimeoutException, InterruptedException {
|
||||||
GenericTestUtils.waitFor(() -> htr.isHBaseDown(), 1000, 150000);
|
|
||||||
try {
|
try {
|
||||||
|
GenericTestUtils.waitFor(() -> {
|
||||||
|
try {
|
||||||
|
htr.getTimelineStorageMonitor().checkStorageIsUp();
|
||||||
|
return false;
|
||||||
|
} catch (IOException e) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}, 1000, 150000);
|
||||||
checkQuery(htr);
|
checkQuery(htr);
|
||||||
Assert.fail("Query should fail when HBase is down");
|
Assert.fail("Query should fail when HBase is down");
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
|
|
@ -0,0 +1,117 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.timelineservice.storage;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.Assert;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_READER_STORAGE_MONITOR_INTERVAL_MS;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class tests HbaseTimelineWriter with Hbase Down.
|
||||||
|
*/
|
||||||
|
public class TestTimelineWriterHBaseDown {
|
||||||
|
|
||||||
|
@Test(timeout=300000)
|
||||||
|
public void testTimelineWriterHBaseDown() throws Exception {
|
||||||
|
HBaseTestingUtility util = new HBaseTestingUtility();
|
||||||
|
HBaseTimelineWriterImpl writer = new HBaseTimelineWriterImpl();
|
||||||
|
try {
|
||||||
|
Configuration c1 = util.getConfiguration();
|
||||||
|
c1.setLong(TIMELINE_SERVICE_READER_STORAGE_MONITOR_INTERVAL_MS, 5000);
|
||||||
|
writer.init(c1);
|
||||||
|
writer.start();
|
||||||
|
|
||||||
|
util.startMiniCluster();
|
||||||
|
DataGeneratorForTest.createSchema(util.getConfiguration());
|
||||||
|
|
||||||
|
TimelineStorageMonitor storageMonitor = writer.
|
||||||
|
getTimelineStorageMonitor();
|
||||||
|
waitForHBaseToUp(storageMonitor);
|
||||||
|
|
||||||
|
try {
|
||||||
|
storageMonitor.checkStorageIsUp();
|
||||||
|
} catch(IOException e) {
|
||||||
|
Assert.fail("HBaseStorageMonitor failed to detect HBase Up");
|
||||||
|
}
|
||||||
|
|
||||||
|
util.shutdownMiniHBaseCluster();
|
||||||
|
waitForHBaseToDown(storageMonitor);
|
||||||
|
|
||||||
|
TimelineEntities te = new TimelineEntities();
|
||||||
|
ApplicationEntity entity = new ApplicationEntity();
|
||||||
|
String appId = "application_1000178881110_2002";
|
||||||
|
entity.setId(appId);
|
||||||
|
Long cTime = 1425016501000L;
|
||||||
|
entity.setCreatedTime(cTime);
|
||||||
|
te.addEntity(entity);
|
||||||
|
|
||||||
|
boolean exceptionCaught = false;
|
||||||
|
try{
|
||||||
|
writer.write(new TimelineCollectorContext("ATS1", "user1", "flow2",
|
||||||
|
"AB7822C10F1111", 1002345678919L, appId), te,
|
||||||
|
UserGroupInformation.createRemoteUser("user1"));
|
||||||
|
} catch (IOException e) {
|
||||||
|
if (e.getMessage().equals("HBase is down")) {
|
||||||
|
exceptionCaught = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertTrue("HBaseStorageMonitor failed to detect HBase Down",
|
||||||
|
exceptionCaught);
|
||||||
|
} finally {
|
||||||
|
writer.stop();
|
||||||
|
util.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void waitForHBaseToUp(TimelineStorageMonitor storageMonitor)
|
||||||
|
throws Exception {
|
||||||
|
GenericTestUtils.waitFor(() -> {
|
||||||
|
try {
|
||||||
|
storageMonitor.checkStorageIsUp();
|
||||||
|
return true;
|
||||||
|
} catch (IOException e) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}, 1000, 150000);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void waitForHBaseToDown(TimelineStorageMonitor storageMonitor)
|
||||||
|
throws Exception {
|
||||||
|
GenericTestUtils.waitFor(() -> {
|
||||||
|
try {
|
||||||
|
storageMonitor.checkStorageIsUp();
|
||||||
|
return false;
|
||||||
|
} catch (IOException e) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}, 1000, 150000);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -78,10 +78,6 @@ public class HBaseTimelineReaderImpl
|
||||||
super.serviceStop();
|
super.serviceStop();
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isHBaseDown() {
|
|
||||||
return storageMonitor.isStorageDown();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TimelineEntity getEntity(TimelineReaderContext context,
|
public TimelineEntity getEntity(TimelineReaderContext context,
|
||||||
TimelineDataToRetrieve dataToRetrieve) throws IOException {
|
TimelineDataToRetrieve dataToRetrieve) throws IOException {
|
||||||
|
@ -113,14 +109,19 @@ public class HBaseTimelineReaderImpl
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TimelineHealth getHealthStatus() {
|
public TimelineHealth getHealthStatus() {
|
||||||
if (!this.isHBaseDown()) {
|
try {
|
||||||
|
storageMonitor.checkStorageIsUp();
|
||||||
return new TimelineHealth(TimelineHealth.TimelineHealthStatus.RUNNING,
|
return new TimelineHealth(TimelineHealth.TimelineHealthStatus.RUNNING,
|
||||||
"");
|
"");
|
||||||
} else {
|
} catch (IOException e){
|
||||||
return new TimelineHealth(
|
return new TimelineHealth(
|
||||||
TimelineHealth.TimelineHealthStatus.READER_CONNECTION_FAILURE,
|
TimelineHealth.TimelineHealthStatus.READER_CONNECTION_FAILURE,
|
||||||
"HBase connection is down");
|
"HBase connection is down");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected TimelineStorageMonitor getTimelineStorageMonitor() {
|
||||||
|
return storageMonitor;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -100,6 +100,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
||||||
.getLogger(HBaseTimelineWriterImpl.class);
|
.getLogger(HBaseTimelineWriterImpl.class);
|
||||||
|
|
||||||
private Connection conn;
|
private Connection conn;
|
||||||
|
private TimelineStorageMonitor storageMonitor;
|
||||||
private TypedBufferedMutator<EntityTable> entityTable;
|
private TypedBufferedMutator<EntityTable> entityTable;
|
||||||
private TypedBufferedMutator<AppToFlowTable> appToFlowTable;
|
private TypedBufferedMutator<AppToFlowTable> appToFlowTable;
|
||||||
private TypedBufferedMutator<ApplicationTable> applicationTable;
|
private TypedBufferedMutator<ApplicationTable> applicationTable;
|
||||||
|
@ -150,9 +151,16 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
||||||
UserGroupInformation ugi = UserGroupInformation.isSecurityEnabled() ?
|
UserGroupInformation ugi = UserGroupInformation.isSecurityEnabled() ?
|
||||||
UserGroupInformation.getLoginUser() :
|
UserGroupInformation.getLoginUser() :
|
||||||
UserGroupInformation.getCurrentUser();
|
UserGroupInformation.getCurrentUser();
|
||||||
|
storageMonitor = new HBaseStorageMonitor(conf);
|
||||||
LOG.info("Initialized HBaseTimelineWriterImpl UGI to " + ugi);
|
LOG.info("Initialized HBaseTimelineWriterImpl UGI to " + ugi);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void serviceStart() throws Exception {
|
||||||
|
super.serviceStart();
|
||||||
|
storageMonitor.start();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Stores the entire information in TimelineEntities to the timeline store.
|
* Stores the entire information in TimelineEntities to the timeline store.
|
||||||
*/
|
*/
|
||||||
|
@ -160,7 +168,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
||||||
public TimelineWriteResponse write(TimelineCollectorContext context,
|
public TimelineWriteResponse write(TimelineCollectorContext context,
|
||||||
TimelineEntities data, UserGroupInformation callerUgi)
|
TimelineEntities data, UserGroupInformation callerUgi)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
storageMonitor.checkStorageIsUp();
|
||||||
TimelineWriteResponse putStatus = new TimelineWriteResponse();
|
TimelineWriteResponse putStatus = new TimelineWriteResponse();
|
||||||
|
|
||||||
String clusterId = context.getClusterId();
|
String clusterId = context.getClusterId();
|
||||||
|
@ -242,6 +250,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
||||||
public TimelineWriteResponse write(TimelineCollectorContext context,
|
public TimelineWriteResponse write(TimelineCollectorContext context,
|
||||||
TimelineDomain domain)
|
TimelineDomain domain)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
storageMonitor.checkStorageIsUp();
|
||||||
TimelineWriteResponse putStatus = new TimelineWriteResponse();
|
TimelineWriteResponse putStatus = new TimelineWriteResponse();
|
||||||
|
|
||||||
String clusterId = context.getClusterId();
|
String clusterId = context.getClusterId();
|
||||||
|
@ -591,6 +600,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
||||||
@Override
|
@Override
|
||||||
public TimelineWriteResponse aggregate(TimelineEntity data,
|
public TimelineWriteResponse aggregate(TimelineEntity data,
|
||||||
TimelineAggregationTrack track) throws IOException {
|
TimelineAggregationTrack track) throws IOException {
|
||||||
|
storageMonitor.checkStorageIsUp();
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -603,6 +613,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void flush() throws IOException {
|
public void flush() throws IOException {
|
||||||
|
storageMonitor.checkStorageIsUp();
|
||||||
// flush all buffered mutators
|
// flush all buffered mutators
|
||||||
entityTable.flush();
|
entityTable.flush();
|
||||||
appToFlowTable.flush();
|
appToFlowTable.flush();
|
||||||
|
@ -653,6 +664,12 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
|
||||||
LOG.info("closing the hbase Connection");
|
LOG.info("closing the hbase Connection");
|
||||||
conn.close();
|
conn.close();
|
||||||
}
|
}
|
||||||
|
storageMonitor.stop();
|
||||||
super.serviceStop();
|
super.serviceStop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected TimelineStorageMonitor getTimelineStorageMonitor() {
|
||||||
|
return storageMonitor;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -81,10 +81,6 @@ public abstract class TimelineStorageMonitor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isStorageDown() {
|
|
||||||
return storageDown.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
private class MonitorThread implements Runnable {
|
private class MonitorThread implements Runnable {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
|
|
Loading…
Reference in New Issue