diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorder.java index d114e354c77..2097fc9d5d1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorder.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorder.java @@ -18,16 +18,10 @@ package org.apache.hadoop.hbase.fs; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.BindException; import java.net.ServerSocket; -import java.util.List; -import java.util.concurrent.CountDownLatch; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.impl.Log4JLogger; @@ -38,28 +32,12 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.util.CommonFSUtils; -import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.MiniHBaseCluster; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.regionserver.Region; -import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; -import org.apache.hadoop.hbase.wal.DefaultWALProvider; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.DirectoryListing; -import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.server.datanode.DataNode; @@ -72,6 +50,11 @@ import org.junit.experimental.categories.Category; /** * Tests for the hdfs fix from HBASE-6435. + * + * Please don't add new subtest which involves starting / stopping MiniDFSCluster in this class. + * When stopping MiniDFSCluster, shutdown hooks would be cleared in hadoop's ShutdownHookManager + * in hadoop 3. + * This leads to 'Failed suppression of fs shutdown hook' error in region server. */ @Category(LargeTests.class) public class TestBlockReorder { @@ -250,235 +233,4 @@ public class TestBlockReorder { return res; } } - - /** - * Test that the hook works within HBase, including when there are multiple blocks. - */ - @Test() - public void testHBaseCluster() throws Exception { - byte[] sb = "sb".getBytes(); - htu.startMiniZKCluster(); - - MiniHBaseCluster hbm = htu.startMiniHBaseCluster(1, 1); - hbm.waitForActiveAndReadyMaster(); - hbm.getRegionServer(0).waitForServerOnline(); - HRegionServer targetRs = hbm.getRegionServer(0); - - // We want to have a datanode with the same name as the region server, so - // we're going to get the regionservername, and start a new datanode with this name. - String host4 = targetRs.getServerName().getHostname(); - LOG.info("Starting a new datanode with the name=" + host4); - cluster.startDataNodes(conf, 1, true, null, new String[]{"/r4"}, new String[]{host4}, null); - cluster.waitClusterUp(); - - final int repCount = 3; - - // We use the regionserver file system & conf as we expect it to have the hook. - conf = targetRs.getConfiguration(); - HFileSystem rfs = (HFileSystem) targetRs.getFileSystem(); - Table h = htu.createTable(TableName.valueOf("table"), sb); - - // Now, we have 4 datanodes and a replication count of 3. So we don't know if the datanode - // with the same node will be used. We can't really stop an existing datanode, this would - // make us fall in nasty hdfs bugs/issues. So we're going to try multiple times. - - // Now we need to find the log file, its locations, and look at it - - String walDir = new Path(FSUtils.getWALRootDir(conf) + "/" + HConstants.HREGION_LOGDIR_NAME + - "/" + targetRs.getServerName().toString()).toUri().getPath(); - - DistributedFileSystem mdfs = (DistributedFileSystem) - hbm.getMaster().getMasterFileSystem().getFileSystem(); - - - int nbTest = 0; - while (nbTest < 10) { - final List regions = targetRs.getOnlineRegions(h.getName()); - final CountDownLatch latch = new CountDownLatch(regions.size()); - // listen for successful log rolls - final WALActionsListener listener = new WALActionsListener.Base() { - @Override - public void postLogRoll(final Path oldPath, final Path newPath) throws IOException { - latch.countDown(); - } - }; - for (Region region : regions) { - ((HRegion)region).getWAL().registerWALActionsListener(listener); - } - - htu.getHBaseAdmin().rollWALWriter(targetRs.getServerName()); - - // wait - try { - latch.await(); - } catch (InterruptedException exception) { - LOG.warn("Interrupted while waiting for the wal of '" + targetRs + "' to roll. If later " + - "tests fail, it's probably because we should still be waiting."); - Thread.currentThread().interrupt(); - } - for (Region region : regions) { - ((HRegion)region).getWAL().unregisterWALActionsListener(listener); - } - - // We need a sleep as the namenode is informed asynchronously - Thread.sleep(100); - - // insert one put to ensure a minimal size - Put p = new Put(sb); - p.add(sb, sb, sb); - h.put(p); - - DirectoryListing dl = dfs.getClient().listPaths(walDir, HdfsFileStatus.EMPTY_NAME); - HdfsFileStatus[] hfs = dl.getPartialListing(); - - // As we wrote a put, we should have at least one log file. - Assert.assertTrue(hfs.length >= 1); - for (HdfsFileStatus hf : hfs) { - // Because this is a live cluster, log files might get archived while we're processing - try { - LOG.info("Log file found: " + hf.getLocalName() + " in " + walDir); - String logFile = walDir + "/" + hf.getLocalName(); - FileStatus fsLog = rfs.getFileStatus(new Path(logFile)); - - LOG.info("Checking log file: " + logFile); - // Now checking that the hook is up and running - // We can't call directly getBlockLocations, it's not available in HFileSystem - // We're trying multiple times to be sure, as the order is random - - BlockLocation[] bls = rfs.getFileBlockLocations(fsLog, 0, 1); - if (bls.length > 0) { - BlockLocation bl = bls[0]; - - LOG.info(bl.getHosts().length + " replicas for block 0 in " + logFile + " "); - for (int i = 0; i < bl.getHosts().length - 1; i++) { - LOG.info(bl.getHosts()[i] + " " + logFile); - Assert.assertNotSame(bl.getHosts()[i], host4); - } - String last = bl.getHosts()[bl.getHosts().length - 1]; - LOG.info(last + " " + logFile); - if (host4.equals(last)) { - nbTest++; - LOG.info(logFile + " is on the new datanode and is ok"); - if (bl.getHosts().length == 3) { - // We can test this case from the file system as well - // Checking the underlying file system. Multiple times as the order is random - testFromDFS(dfs, logFile, repCount, host4); - - // now from the master - testFromDFS(mdfs, logFile, repCount, host4); - } - } - } - } catch (FileNotFoundException exception) { - LOG.debug("Failed to find log file '" + hf.getLocalName() + "'; it probably was " + - "archived out from under us so we'll ignore and retry. If this test hangs " + - "indefinitely you should treat this failure as a symptom.", exception); - } catch (RemoteException exception) { - if (exception.unwrapRemoteException() instanceof FileNotFoundException) { - LOG.debug("Failed to find log file '" + hf.getLocalName() + "'; it probably was " + - "archived out from under us so we'll ignore and retry. If this test hangs " + - "indefinitely you should treat this failure as a symptom.", exception); - } else { - throw exception; - } - } - } - } - } - - private void testFromDFS(DistributedFileSystem dfs, String src, int repCount, String localhost) - throws Exception { - // Multiple times as the order is random - for (int i = 0; i < 10; i++) { - LocatedBlocks l; - // The NN gets the block list asynchronously, so we may need multiple tries to get the list - final long max = System.currentTimeMillis() + 10000; - boolean done; - do { - Assert.assertTrue("Can't get enouth replica.", System.currentTimeMillis() < max); - l = getNamenode(dfs.getClient()).getBlockLocations(src, 0, 1); - Assert.assertNotNull("Can't get block locations for " + src, l); - Assert.assertNotNull(l.getLocatedBlocks()); - Assert.assertTrue(l.getLocatedBlocks().size() > 0); - - done = true; - for (int y = 0; y < l.getLocatedBlocks().size() && done; y++) { - done = (l.get(y).getLocations().length == repCount); - } - } while (!done); - - for (int y = 0; y < l.getLocatedBlocks().size() && done; y++) { - Assert.assertEquals(localhost, l.get(y).getLocations()[repCount - 1].getHostName()); - } - } - } - - private static ClientProtocol getNamenode(DFSClient dfsc) throws Exception { - Field nf = DFSClient.class.getDeclaredField("namenode"); - nf.setAccessible(true); - return (ClientProtocol) nf.get(dfsc); - } - - /** - * Test that the reorder algo works as we expect. - */ - @Test - public void testBlockLocation() throws Exception { - // We need to start HBase to get HConstants.HBASE_DIR set in conf - htu.startMiniZKCluster(); - MiniHBaseCluster hbm = htu.startMiniHBaseCluster(1, 1); - conf = hbm.getConfiguration(); - - - // The "/" is mandatory, without it we've got a null pointer exception on the namenode - final String fileName = "/helloWorld"; - Path p = new Path(fileName); - - final int repCount = 3; - Assert.assertTrue((short) cluster.getDataNodes().size() >= repCount); - - // Let's write the file - FSDataOutputStream fop = dfs.create(p, (short) repCount); - final double toWrite = 875.5613; - fop.writeDouble(toWrite); - fop.close(); - - for (int i=0; i<10; i++){ - // The interceptor is not set in this test, so we get the raw list at this point - LocatedBlocks l; - final long max = System.currentTimeMillis() + 10000; - do { - l = getNamenode(dfs.getClient()).getBlockLocations(fileName, 0, 1); - Assert.assertNotNull(l.getLocatedBlocks()); - Assert.assertEquals(l.getLocatedBlocks().size(), 1); - Assert.assertTrue("Expecting " + repCount + " , got " + l.get(0).getLocations().length, - System.currentTimeMillis() < max); - } while (l.get(0).getLocations().length != repCount); - - // Should be filtered, the name is different => The order won't change - Object originalList[] = l.getLocatedBlocks().toArray(); - HFileSystem.ReorderWALBlocks lrb = new HFileSystem.ReorderWALBlocks(); - lrb.reorderBlocks(conf, l, fileName); - Assert.assertArrayEquals(originalList, l.getLocatedBlocks().toArray()); - - // Should be reordered, as we pretend to be a file name with a compliant stuff - Assert.assertNotNull(conf.get(HConstants.HBASE_DIR)); - Assert.assertFalse(conf.get(HConstants.HBASE_DIR).isEmpty()); - String pseudoLogFile = conf.get(CommonFSUtils.HBASE_WAL_DIR) + "/" + - HConstants.HREGION_LOGDIR_NAME + "/" + host1 + ",6977,6576" + "/mylogfile"; - - // Check that it will be possible to extract a ServerName from our construction - Assert.assertNotNull("log= " + pseudoLogFile, - DefaultWALProvider.getServerNameFromWALDirectoryName(dfs.getConf(), pseudoLogFile)); - - // And check we're doing the right reorder. - lrb.reorderBlocks(conf, l, pseudoLogFile); - Assert.assertEquals(host1, l.get(0).getLocations()[2].getHostName()); - - // Check again, it should remain the same. - lrb.reorderBlocks(conf, l, pseudoLogFile); - Assert.assertEquals(host1, l.get(0).getLocations()[2].getHostName()); - } - } - } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorderBlockLocation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorderBlockLocation.java new file mode 100644 index 00000000000..0e46e9642d8 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorderBlockLocation.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.fs; + +import java.lang.reflect.Field; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.apache.hadoop.hbase.wal.DefaultWALProvider; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +/** + * Tests for the hdfs fix from HBASE-6435. + * + * Please don't add new subtest which involves starting / stopping MiniDFSCluster in this class. + * When stopping MiniDFSCluster, shutdown hooks would be cleared in hadoop's ShutdownHookManager + * in hadoop 3. + * This leads to 'Failed suppression of fs shutdown hook' error in region server. + */ +@Category({MiscTests.class, LargeTests.class}) +public class TestBlockReorderBlockLocation { + + private Configuration conf; + private MiniDFSCluster cluster; + private HBaseTestingUtility htu; + private DistributedFileSystem dfs; + private static final String host1 = "host1"; + private static final String host2 = "host2"; + private static final String host3 = "host3"; + + @Rule + public TestName name = new TestName(); + + @Before + public void setUp() throws Exception { + htu = new HBaseTestingUtility(); + htu.getConfiguration().setInt("dfs.blocksize", 1024);// For the test with multiple blocks + htu.getConfiguration().setInt("dfs.replication", 3); + htu.startMiniDFSCluster(3, + new String[]{"/r1", "/r2", "/r3"}, new String[]{host1, host2, host3}); + + conf = htu.getConfiguration(); + cluster = htu.getDFSCluster(); + dfs = (DistributedFileSystem) FileSystem.get(conf); + } + + @After + public void tearDownAfterClass() throws Exception { + htu.shutdownMiniCluster(); + } + + + private static ClientProtocol getNamenode(DFSClient dfsc) throws Exception { + Field nf = DFSClient.class.getDeclaredField("namenode"); + nf.setAccessible(true); + return (ClientProtocol) nf.get(dfsc); + } + + /** + * Test that the reorder algo works as we expect. + */ + @Test + public void testBlockLocation() throws Exception { + // We need to start HBase to get HConstants.HBASE_DIR set in conf + htu.startMiniZKCluster(); + MiniHBaseCluster hbm = htu.startMiniHBaseCluster(1, 1); + conf = hbm.getConfiguration(); + + + // The "/" is mandatory, without it we've got a null pointer exception on the namenode + final String fileName = "/helloWorld"; + Path p = new Path(fileName); + + final int repCount = 3; + Assert.assertTrue((short) cluster.getDataNodes().size() >= repCount); + + // Let's write the file + FSDataOutputStream fop = dfs.create(p, (short) repCount); + final double toWrite = 875.5613; + fop.writeDouble(toWrite); + fop.close(); + + for (int i=0; i<10; i++){ + // The interceptor is not set in this test, so we get the raw list at this point + LocatedBlocks l; + final long max = System.currentTimeMillis() + 10000; + do { + l = getNamenode(dfs.getClient()).getBlockLocations(fileName, 0, 1); + Assert.assertNotNull(l.getLocatedBlocks()); + Assert.assertEquals(1, l.getLocatedBlocks().size()); + Assert.assertTrue("Expecting " + repCount + " , got " + l.get(0).getLocations().length, + System.currentTimeMillis() < max); + } while (l.get(0).getLocations().length != repCount); + + // Should be filtered, the name is different => The order won't change + Object[] originalList = l.getLocatedBlocks().toArray(); + HFileSystem.ReorderWALBlocks lrb = new HFileSystem.ReorderWALBlocks(); + lrb.reorderBlocks(conf, l, fileName); + Assert.assertArrayEquals(originalList, l.getLocatedBlocks().toArray()); + + // Should be reordered, as we pretend to be a file name with a compliant stuff + Assert.assertNotNull(conf.get(HConstants.HBASE_DIR)); + Assert.assertFalse(conf.get(HConstants.HBASE_DIR).isEmpty()); + String pseudoLogFile = conf.get(HConstants.HBASE_DIR) + "/" + + HConstants.HREGION_LOGDIR_NAME + "/" + host1 + ",6977,6576" + "/mylogfile"; + + // Check that it will be possible to extract a ServerName from our construction + Assert.assertNotNull("log= " + pseudoLogFile, + DefaultWALProvider.getServerNameFromWALDirectoryName(dfs.getConf(), pseudoLogFile)); + + // And check we're doing the right reorder. + lrb.reorderBlocks(conf, l, pseudoLogFile); + Assert.assertEquals(host1, l.get(0).getLocations()[2].getHostName()); + + // Check again, it should remain the same. + lrb.reorderBlocks(conf, l, pseudoLogFile); + Assert.assertEquals(host1, l.get(0).getLocations()[2].getHostName()); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorderMultiBlocks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorderMultiBlocks.java new file mode 100644 index 00000000000..e8049071a8a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorderMultiBlocks.java @@ -0,0 +1,271 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.fs; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.DirectoryListing; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.ipc.RemoteException; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tests for the hdfs fix from HBASE-6435. + * + * Please don't add new subtest which involves starting / stopping MiniDFSCluster in this class. + * When stopping MiniDFSCluster, shutdown hooks would be cleared in hadoop's ShutdownHookManager + * in hadoop 3. + * This leads to 'Failed suppression of fs shutdown hook' error in region server. + */ +@Category({MiscTests.class, LargeTests.class}) +public class TestBlockReorderMultiBlocks { + + private static final Logger LOG = LoggerFactory.getLogger(TestBlockReorderMultiBlocks.class); + + private Configuration conf; + private MiniDFSCluster cluster; + private HBaseTestingUtility htu; + private DistributedFileSystem dfs; + private static final String host1 = "host1"; + private static final String host2 = "host2"; + private static final String host3 = "host3"; + + @Rule + public TestName name = new TestName(); + + @Before + public void setUp() throws Exception { + htu = new HBaseTestingUtility(); + htu.getConfiguration().setInt("dfs.blocksize", 1024);// For the test with multiple blocks + htu.getConfiguration().setInt("dfs.replication", 3); + htu.startMiniDFSCluster(3, + new String[]{"/r1", "/r2", "/r3"}, new String[]{host1, host2, host3}); + + conf = htu.getConfiguration(); + cluster = htu.getDFSCluster(); + dfs = (DistributedFileSystem) FileSystem.get(conf); + } + + @After + public void tearDownAfterClass() throws Exception { + htu.shutdownMiniCluster(); + } + + /** + * Test that the hook works within HBase, including when there are multiple blocks. + */ + @Test() + public void testHBaseCluster() throws Exception { + byte[] sb = "sb".getBytes(); + htu.startMiniZKCluster(); + + MiniHBaseCluster hbm = htu.startMiniHBaseCluster(1, 1); + hbm.waitForActiveAndReadyMaster(); + hbm.getRegionServer(0).waitForServerOnline(); + HRegionServer targetRs = hbm.getRegionServer(0); + + // We want to have a datanode with the same name as the region server, so + // we're going to get the regionservername, and start a new datanode with this name. + String host4 = targetRs.getServerName().getHostname(); + LOG.info("Starting a new datanode with the name=" + host4); + cluster.startDataNodes(conf, 1, true, null, new String[]{"/r4"}, new String[]{host4}, null); + cluster.waitClusterUp(); + + final int repCount = 3; + + // We use the regionserver file system & conf as we expect it to have the hook. + conf = targetRs.getConfiguration(); + HFileSystem rfs = (HFileSystem) targetRs.getFileSystem(); + Table h = htu.createTable(TableName.valueOf("table"), sb); + + // Now, we have 4 datanodes and a replication count of 3. So we don't know if the datanode + // with the same node will be used. We can't really stop an existing datanode, this would + // make us fall in nasty hdfs bugs/issues. So we're going to try multiple times. + + // Now we need to find the log file, its locations, and look at it + + String walDir = new Path(FSUtils.getWALRootDir(conf) + "/" + HConstants.HREGION_LOGDIR_NAME + + "/" + targetRs.getServerName().toString()).toUri().getPath(); + + DistributedFileSystem mdfs = (DistributedFileSystem) + hbm.getMaster().getMasterFileSystem().getFileSystem(); + + + int nbTest = 0; + while (nbTest < 10) { + final List regions = targetRs.getOnlineRegions(h.getName()); + final CountDownLatch latch = new CountDownLatch(regions.size()); + // listen for successful log rolls + final WALActionsListener listener = new WALActionsListener.Base() { + @Override + public void postLogRoll(final Path oldPath, final Path newPath) throws IOException { + latch.countDown(); + } + }; + for (Region region : regions) { + ((HRegion)region).getWAL().registerWALActionsListener(listener); + } + + htu.getHBaseAdmin().rollWALWriter(targetRs.getServerName()); + + // wait + try { + latch.await(); + } catch (InterruptedException exception) { + LOG.warn("Interrupted while waiting for the wal of '" + targetRs + "' to roll. If later " + + "tests fail, it's probably because we should still be waiting."); + Thread.currentThread().interrupt(); + } + for (Region region : regions) { + ((HRegion)region).getWAL().unregisterWALActionsListener(listener); + } + + // We need a sleep as the namenode is informed asynchronously + Thread.sleep(100); + + // insert one put to ensure a minimal size + Put p = new Put(sb); + p.addColumn(sb, sb, sb); + h.put(p); + + DirectoryListing dl = dfs.getClient().listPaths(walDir, HdfsFileStatus.EMPTY_NAME); + HdfsFileStatus[] hfs = dl.getPartialListing(); + + // As we wrote a put, we should have at least one log file. + Assert.assertTrue(hfs.length >= 1); + for (HdfsFileStatus hf : hfs) { + // Because this is a live cluster, log files might get archived while we're processing + try { + LOG.info("Log file found: " + hf.getLocalName() + " in " + walDir); + String logFile = walDir + "/" + hf.getLocalName(); + FileStatus fsLog = rfs.getFileStatus(new Path(logFile)); + + LOG.info("Checking log file: " + logFile); + // Now checking that the hook is up and running + // We can't call directly getBlockLocations, it's not available in HFileSystem + // We're trying multiple times to be sure, as the order is random + + BlockLocation[] bls = rfs.getFileBlockLocations(fsLog, 0, 1); + if (bls.length > 0) { + BlockLocation bl = bls[0]; + + LOG.info(bl.getHosts().length + " replicas for block 0 in " + logFile + " "); + for (int i = 0; i < bl.getHosts().length - 1; i++) { + LOG.info(bl.getHosts()[i] + " " + logFile); + Assert.assertNotSame(bl.getHosts()[i], host4); + } + String last = bl.getHosts()[bl.getHosts().length - 1]; + LOG.info(last + " " + logFile); + if (host4.equals(last)) { + nbTest++; + LOG.info(logFile + " is on the new datanode and is ok"); + if (bl.getHosts().length == 3) { + // We can test this case from the file system as well + // Checking the underlying file system. Multiple times as the order is random + testFromDFS(dfs, logFile, repCount, host4); + + // now from the master + testFromDFS(mdfs, logFile, repCount, host4); + } + } + } + } catch (FileNotFoundException exception) { + LOG.debug("Failed to find log file '" + hf.getLocalName() + "'; it probably was " + + "archived out from under us so we'll ignore and retry. If this test hangs " + + "indefinitely you should treat this failure as a symptom.", exception); + } catch (RemoteException exception) { + if (exception.unwrapRemoteException() instanceof FileNotFoundException) { + LOG.debug("Failed to find log file '" + hf.getLocalName() + "'; it probably was " + + "archived out from under us so we'll ignore and retry. If this test hangs " + + "indefinitely you should treat this failure as a symptom.", exception); + } else { + throw exception; + } + } + } + } + } + + private void testFromDFS(DistributedFileSystem dfs, String src, int repCount, String localhost) + throws Exception { + // Multiple times as the order is random + for (int i = 0; i < 10; i++) { + LocatedBlocks l; + // The NN gets the block list asynchronously, so we may need multiple tries to get the list + final long max = System.currentTimeMillis() + 10000; + boolean done; + do { + Assert.assertTrue("Can't get enouth replica.", System.currentTimeMillis() < max); + l = getNamenode(dfs.getClient()).getBlockLocations(src, 0, 1); + Assert.assertNotNull("Can't get block locations for " + src, l); + Assert.assertNotNull(l.getLocatedBlocks()); + Assert.assertTrue(l.getLocatedBlocks().size() > 0); + + done = true; + for (int y = 0; y < l.getLocatedBlocks().size() && done; y++) { + done = (l.get(y).getLocations().length == repCount); + } + } while (!done); + + for (int y = 0; y < l.getLocatedBlocks().size() && done; y++) { + Assert.assertEquals(localhost, l.get(y).getLocations()[repCount - 1].getHostName()); + } + } + } + + private static ClientProtocol getNamenode(DFSClient dfsc) throws Exception { + Field nf = DFSClient.class.getDeclaredField("namenode"); + nf.setAccessible(true); + return (ClientProtocol) nf.get(dfsc); + } + +}