diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java index d65ebdb628e..862f851fb59 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java @@ -128,9 +128,13 @@ public class MembershipNamenodeResolver // Our cache depends on the store, update it first try { MembershipStore membership = getMembershipStore(); - membership.loadCache(force); + if (!membership.loadCache(force)) { + return false; + } DisabledNameserviceStore disabled = getDisabledNameserviceStore(); - disabled.loadCache(force); + if (!disabled.loadCache(force)) { + return false; + } } catch (IOException e) { LOG.error("Cannot update membership from the State Store", e); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableResolver.java index 2888d8cc501..1fdd4cdfba8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableResolver.java @@ -398,7 +398,9 @@ public class MountTableResolver try { // Our cache depends on the store, update it first MountTableStore mountTable = this.getMountTableStore(); - mountTable.loadCache(force); + if (!mountTable.loadCache(force)) { + return false; + } GetMountTableEntriesRequest request = GetMountTableEntriesRequest.newInstance("/"); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java index 2b693aa936f..6fea9b9946d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java @@ -100,7 +100,7 @@ public abstract class CachedRecordStore * @throws StateStoreUnavailableException If the cache is not initialized. */ private void checkCacheAvailable() throws StateStoreUnavailableException { - if (!this.initialized) { + if (!getDriver().isDriverReady() || !this.initialized) { throw new StateStoreUnavailableException( "Cached State Store not initialized, " + getRecordClass().getSimpleName() + " records not valid"); @@ -125,7 +125,6 @@ public abstract class CachedRecordStore } catch (IOException e) { LOG.error("Cannot get \"{}\" records from the State Store", getRecordClass().getSimpleName()); - this.initialized = false; return false; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MembershipStoreImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MembershipStoreImpl.java index a63a0f3b3ab..5d22b77afe2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MembershipStoreImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MembershipStoreImpl.java @@ -185,7 +185,9 @@ public class MembershipStoreImpl @Override public boolean loadCache(boolean force) throws IOException { - super.loadCache(force); + if (!super.loadCache(force)) { + return false; + } // Update local cache atomically cacheWriteLock.lock(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/MockStateStoreDriver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/MockStateStoreDriver.java new file mode 100644 index 00000000000..57185a0a600 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/MockStateStoreDriver.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.records; + +import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils; +import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreBaseImpl; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * A mock StateStoreDriver that runs in memory that can force IOExceptions + * upon demand. + */ +public class MockStateStoreDriver extends StateStoreBaseImpl { + private boolean giveErrors = false; + private boolean initialized = false; + private final Map> valueMap = new HashMap<>(); + + @Override + public boolean initDriver() { + initialized = true; + return true; + } + + @Override + public boolean initRecordStorage(String className, + Class clazz) { + return true; + } + + @Override + public boolean isDriverReady() { + return initialized; + } + + @Override + public void close() throws Exception { + valueMap.clear(); + initialized = false; + } + + /** + * Should this object throw an IOException on each following call? + * @param value should we throw errors? + */ + public void setGiveErrors(boolean value) { + giveErrors = value; + } + + /** + * Check to see if this StateStore should throw IOException on each call. + * @throws IOException thrown if giveErrors has been set + */ + private void checkErrors() throws IOException { + if (giveErrors) { + throw new IOException("Induced errors"); + } + } + + @Override + @SuppressWarnings("unchecked") + public QueryResult get(Class clazz) throws IOException { + checkErrors(); + Map map = valueMap.get(StateStoreUtils.getRecordName(clazz)); + List results = + map != null ? new ArrayList<>((Collection) map.values()) : new ArrayList<>(); + return new QueryResult<>(results, System.currentTimeMillis()); + } + + @Override + public boolean putAll(List records, + boolean allowUpdate, + boolean errorIfExists) + throws IOException { + checkErrors(); + for (T record : records) { + Map map = + valueMap.computeIfAbsent(StateStoreUtils.getRecordName(record.getClass()), + k -> new HashMap<>()); + String key = record.getPrimaryKey(); + BaseRecord oldRecord = map.get(key); + if (oldRecord == null || allowUpdate) { + map.put(key, record); + } else if (errorIfExists) { + throw new IOException("Record already exists for " + record.getClass() + + ": " + key); + } + } + return true; + } + + @Override + public boolean removeAll(Class clazz) throws IOException { + checkErrors(); + return valueMap.remove(StateStoreUtils.getRecordName(clazz)) != null; + } + + @Override + @SuppressWarnings("unchecked") + public int remove(Class clazz, + Query query) + throws IOException { + checkErrors(); + int result = 0; + Map map = + valueMap.get(StateStoreUtils.getRecordName(clazz)); + if (map != null) { + for (Iterator itr = map.values().iterator(); itr.hasNext();) { + BaseRecord record = itr.next(); + if (query.matches((T) record)) { + itr.remove(); + result += 1; + } + } + } + return result; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestRouterState.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestRouterState.java index dfe2bc98bf4..4289999429b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestRouterState.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestRouterState.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,8 +20,16 @@ package org.apache.hadoop.hdfs.server.federation.store.records; import static org.junit.Assert.assertEquals; import java.io.IOException; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; +import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; import org.apache.hadoop.hdfs.server.federation.router.RouterServiceState; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; import org.junit.Test; @@ -40,7 +48,7 @@ public class TestRouterState { private static final RouterServiceState STATE = RouterServiceState.RUNNING; - private RouterState generateRecord() throws IOException { + private RouterState generateRecord() { RouterState record = RouterState.newInstance(ADDRESS, START_TIME, STATE); record.setVersion(VERSION); record.setCompileInfo(COMPILE_INFO); @@ -82,4 +90,45 @@ public class TestRouterState { validateRecord(newRecord); } + + @Test + public void testStateStoreResilience() throws Exception { + StateStoreService service = new StateStoreService(); + Configuration conf = new Configuration(); + conf.setClass(RBFConfigKeys.FEDERATION_STORE_DRIVER_CLASS, + MockStateStoreDriver.class, + StateStoreDriver.class); + conf.setBoolean(RBFConfigKeys.DFS_ROUTER_METRICS_ENABLE, false); + service.init(conf); + MockStateStoreDriver driver = (MockStateStoreDriver) service.getDriver(); + // Add two records for block1 + driver.put(MembershipState.newInstance("routerId", "ns1", + "ns1-ha1", "cluster1", "block1", "rpc1", + "service1", "lifeline1", "https", "nn01", + FederationNamenodeServiceState.ACTIVE, false), false, false); + driver.put(MembershipState.newInstance("routerId", "ns1", + "ns1-ha2", "cluster1", "block1", "rpc2", + "service2", "lifeline2", "https", "nn02", + FederationNamenodeServiceState.STANDBY, false), false, false); + // load the cache + service.loadDriver(); + MembershipNamenodeResolver resolver = new MembershipNamenodeResolver(conf, service); + service.refreshCaches(true); + + // look up block1 + List result = + resolver.getNamenodesForBlockPoolId("block1"); + assertEquals(2, result.size()); + + // cause io errors and then reload the cache + driver.setGiveErrors(true); + long previousUpdate = service.getCacheUpdateTime(); + service.refreshCaches(true); + assertEquals(previousUpdate, service.getCacheUpdateTime()); + + // make sure the old cache is still there + result = resolver.getNamenodesForBlockPoolId("block1"); + assertEquals(2, result.size()); + service.stop(); + } }