diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContextWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContextWithHA.java index 1acbd75d292..a49425260eb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContextWithHA.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContextWithHA.java @@ -18,28 +18,24 @@ package org.apache.hadoop.hdfs; -import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.CoreMatchers.not; import static org.junit.Assert.assertThat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; -import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; -import org.apache.hadoop.hdfs.server.namenode.ha.ClientHAProxyFactory; -import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.server.namenode.ha.HAProxyFactory; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; -import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos; -import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; -import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.URI; @@ -61,55 +57,31 @@ import java.util.concurrent.TimeUnit; * to the most recent alignment state of the server. */ public class TestStateAlignmentContextWithHA { + public static final Logger LOG = + LoggerFactory.getLogger(TestStateAlignmentContextWithHA.class.getName()); private static final int NUMDATANODES = 1; private static final int NUMCLIENTS = 10; - private static final int NUMFILES = 300; + private static final int NUMFILES = 120; private static final Configuration CONF = new HdfsConfiguration(); - private static final String NAMESERVICE = "nameservice"; private static final List AC_LIST = new ArrayList<>(); private static MiniDFSCluster cluster; private static List clients; - private static ClientGSIContext spy; private DistributedFileSystem dfs; private int active = 0; private int standby = 1; - static class AlignmentContextProxyProvider - extends ConfiguredFailoverProxyProvider { + static class ORPPwithAlignmentContexts + extends ObserverReadProxyProvider { - private ClientGSIContext alignmentContext; - - public AlignmentContextProxyProvider( + public ORPPwithAlignmentContexts( Configuration conf, URI uri, Class xface, HAProxyFactory factory) throws IOException { super(conf, uri, xface, factory); - // Create and set AlignmentContext in HAProxyFactory. - // All proxies by factory will now have AlignmentContext assigned. - this.alignmentContext = (spy != null ? spy : new ClientGSIContext()); - ((ClientHAProxyFactory) factory).setAlignmentContext(alignmentContext); - - AC_LIST.add(alignmentContext); - } - } - - static class SpyConfiguredContextProxyProvider - extends ConfiguredFailoverProxyProvider { - - private ClientGSIContext alignmentContext; - - public SpyConfiguredContextProxyProvider( - Configuration conf, URI uri, Class xface, - HAProxyFactory factory) throws IOException { - super(conf, uri, xface, factory); - - // Create but DON'T set in HAProxyFactory. - this.alignmentContext = (spy != null ? spy : new ClientGSIContext()); - - AC_LIST.add(alignmentContext); + AC_LIST.add((ClientGSIContext) getAlignmentContext()); } } @@ -121,23 +93,21 @@ public class TestStateAlignmentContextWithHA { CONF.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10); CONF.setBoolean("fs.hdfs.impl.disable.cache", true); - MiniDFSNNTopology.NSConf nsConf = new MiniDFSNNTopology.NSConf(NAMESERVICE); - nsConf.addNN(new MiniDFSNNTopology.NNConf("nn1")); - nsConf.addNN(new MiniDFSNNTopology.NNConf("nn2")); - cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(NUMDATANODES) - .nnTopology(MiniDFSNNTopology.simpleHATopology().addNameservice(nsConf)) + .nnTopology(MiniDFSNNTopology.simpleHATopology(3)) .build(); cluster.waitActive(); cluster.transitionToActive(0); + cluster.transitionToObserver(2); + + String nameservice = HATestUtil.getLogicalHostname(cluster); + HATestUtil.setFailoverConfigurations(cluster, CONF, nameservice, 0); + CONF.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + + "." + nameservice, ORPPwithAlignmentContexts.class.getName()); } @Before public void before() throws IOException, URISyntaxException { - killWorkers(); - HATestUtil.setFailoverConfigurations(cluster, CONF, NAMESERVICE, 0); - CONF.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + - "." + NAMESERVICE, AlignmentContextProxyProvider.class.getName()); dfs = (DistributedFileSystem) FileSystem.get(CONF); } @@ -151,6 +121,7 @@ public class TestStateAlignmentContextWithHA { @After public void after() throws IOException { + killWorkers(); cluster.transitionToStandby(1); cluster.transitionToActive(0); active = 0; @@ -160,26 +131,6 @@ public class TestStateAlignmentContextWithHA { dfs = null; } AC_LIST.clear(); - spy = null; - } - - /** - * This test checks if after a client writes we can see the state id in - * updated via the response. - */ - @Test - public void testNoStateOnConfiguredProxyProvider() throws Exception { - Configuration confCopy = new Configuration(CONF); - confCopy.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + - "." + NAMESERVICE, SpyConfiguredContextProxyProvider.class.getName()); - - try (DistributedFileSystem clearDfs = - (DistributedFileSystem) FileSystem.get(confCopy)) { - ClientGSIContext clientState = getContext(1); - assertThat(clientState.getLastSeenStateId(), is(Long.MIN_VALUE)); - DFSTestUtil.writeFile(clearDfs, new Path("/testFileNoState"), "no_state"); - assertThat(clientState.getLastSeenStateId(), is(Long.MIN_VALUE)); - } } /** @@ -233,48 +184,6 @@ public class TestStateAlignmentContextWithHA { } } - /** - * This test mocks an AlignmentContext and ensures that DFSClient - * writes its lastSeenStateId into RPC requests. - */ - @Test - public void testClientSendsState() throws Exception { - ClientGSIContext alignmentContext = new ClientGSIContext(); - ClientGSIContext spiedAlignContext = Mockito.spy(alignmentContext); - spy = spiedAlignContext; - - try (DistributedFileSystem clearDfs = - (DistributedFileSystem) FileSystem.get(CONF)) { - - // Collect RpcRequestHeaders for verification later. - final List headers = - new ArrayList<>(); - Mockito.doAnswer(a -> { - Object[] arguments = a.getArguments(); - RpcHeaderProtos.RpcRequestHeaderProto.Builder header = - (RpcHeaderProtos.RpcRequestHeaderProto.Builder) arguments[0]; - headers.add(header); - return a.callRealMethod(); - }).when(spiedAlignContext).updateRequestState(Mockito.any()); - - DFSTestUtil.writeFile(clearDfs, new Path("/testFile4"), "shv"); - - // Ensure first header and last header have different state. - assertThat(headers.size() > 1, is(true)); - assertThat(headers.get(0).getStateId(), - is(not(headers.get(headers.size() - 1)))); - - // Ensure collected RpcRequestHeaders are in increasing order. - long lastHeader = headers.get(0).getStateId(); - for (RpcHeaderProtos.RpcRequestHeaderProto.Builder header : - headers.subList(1, headers.size())) { - long currentHeader = header.getStateId(); - assertThat(currentHeader >= lastHeader, is(true)); - lastHeader = header.getStateId(); - } - } - } - /** * This test checks if after a client writes we can see the state id in * updated via the response. @@ -310,14 +219,22 @@ public class TestStateAlignmentContextWithHA { @Test(timeout=300000) public void testMultiClientStatesWithRandomFailovers() throws Exception { - // We want threads to run during failovers; assuming at minimum 4 cores, - // would like to see 2 clients competing against 2 NameNodes. + // First run, half the load, with one failover. + runClientsWithFailover(1, NUMCLIENTS/2, NUMFILES/2); + // Second half, with fail back. + runClientsWithFailover(NUMCLIENTS/2 + 1, NUMCLIENTS, NUMFILES/2); + } + + private void runClientsWithFailover(int clientStartId, + int numClients, + int numFiles) + throws Exception { ExecutorService execService = Executors.newFixedThreadPool(2); - clients = new ArrayList<>(NUMCLIENTS); - for (int i = 1; i <= NUMCLIENTS; i++) { + clients = new ArrayList<>(numClients); + for (int i = clientStartId; i <= numClients; i++) { DistributedFileSystem haClient = (DistributedFileSystem) FileSystem.get(CONF); - clients.add(new Worker(haClient, NUMFILES, "/testFile3FO_", i)); + clients.add(new Worker(haClient, numFiles, "/testFile3FO_", i)); } // Execute workers in threadpool with random failovers. @@ -325,15 +242,18 @@ public class TestStateAlignmentContextWithHA { execService.shutdown(); boolean finished = false; + failOver(); + while (!finished) { - failOver(); - finished = execService.awaitTermination(1L, TimeUnit.SECONDS); + finished = execService.awaitTermination(20L, TimeUnit.SECONDS); } // Validation. for (Future future : futures) { assertThat(future.get(), is(STATE.SUCCESS)); } + + clients.clear(); } private ClientGSIContext getContext(int clientCreationIndex) { @@ -341,7 +261,9 @@ public class TestStateAlignmentContextWithHA { } private void failOver() throws IOException { + LOG.info("Transitioning Active to Standby"); cluster.transitionToStandby(active); + LOG.info("Transitioning Standby to Active"); cluster.transitionToActive(standby); int tempActive = active; active = standby; @@ -388,30 +310,36 @@ public class TestStateAlignmentContextWithHA { @Override public STATE call() { + int i = -1; try { - for (int i = 0; i < filesToMake; i++) { - long preClientStateFO = - getContext(nonce).getLastSeenStateId(); + for (i = 0; i < filesToMake; i++) { + ClientGSIContext gsiContext = getContext(nonce); + long preClientStateFO = gsiContext.getLastSeenStateId(); // Write using HA client. - Path path = new Path(filePath + nonce + i); + Path path = new Path(filePath + nonce + "_" + i); DFSTestUtil.writeFile(client, path, "erk"); - long postClientStateFO = - getContext(nonce).getLastSeenStateId(); + long postClientStateFO = gsiContext.getLastSeenStateId(); // Write(s) should have increased state. Check for greater than. - if (postClientStateFO <= preClientStateFO) { - System.out.println("FAIL: Worker started with: " + - preClientStateFO + ", but finished with: " + postClientStateFO); + if (postClientStateFO < 0 || postClientStateFO <= preClientStateFO) { + LOG.error("FAIL: Worker started with: {} , but finished with: {}", + preClientStateFO, postClientStateFO); return STATE.FAIL; } + + if(i % (NUMFILES/10) == 0) { + LOG.info("Worker {} created {} files", nonce, i); + LOG.info("LastSeenStateId = {}", postClientStateFO); + } } - client.close(); return STATE.SUCCESS; - } catch (IOException e) { - System.out.println("ERROR: Worker failed with: " + e); + } catch (Exception e) { + LOG.error("ERROR: Worker failed with: ", e); return STATE.ERROR; + } finally { + LOG.info("Worker {} created {} files", nonce, i); } }