diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 20fc9efe57e..f0d4f8921a3 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -124,12 +124,28 @@ public static void setCallIdAndRetryCount(int cid, int rc, Preconditions.checkArgument(cid != RpcConstants.INVALID_CALL_ID); Preconditions.checkState(callId.get() == null); Preconditions.checkArgument(rc != RpcConstants.INVALID_RETRY_COUNT); + setCallIdAndRetryCountUnprotected(cid, rc, externalHandler); + } + public static void setCallIdAndRetryCountUnprotected(Integer cid, int rc, + Object externalHandler) { callId.set(cid); retryCount.set(rc); EXTERNAL_CALL_HANDLER.set(externalHandler); } + public static int getCallId() { + return callId.get() != null ? callId.get() : nextCallId(); + } + + public static int getRetryCount() { + return retryCount.get() != null ? retryCount.get() : 0; + } + + public static Object getExternalHandler() { + return EXTERNAL_CALL_HANDLER.get(); + } + private final ConcurrentMap connections = new ConcurrentHashMap<>(); private final Object putLock = new Object(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java index 9011b25eda0..5e83fff6b78 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hdfs.server.namenode.ha; import java.io.IOException; -import java.lang.reflect.InvocationHandler; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Proxy; @@ -27,20 +26,24 @@ import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.ExecutionException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.ipc.StandbyException; - -import org.apache.hadoop.io.retry.MultiException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.retry.MultiException; +import org.apache.hadoop.ipc.Client; +import org.apache.hadoop.ipc.Client.ConnectionId; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.ipc.RpcInvocationHandler; +import org.apache.hadoop.ipc.StandbyException; + /** * A FailoverProxyProvider implementation that technically does not "failover" * per-se. It constructs a wrapper proxy that sends the request to ALL @@ -55,7 +58,7 @@ public class RequestHedgingProxyProvider extends public static final Logger LOG = LoggerFactory.getLogger(RequestHedgingProxyProvider.class); - class RequestHedgingInvocationHandler implements InvocationHandler { + class RequestHedgingInvocationHandler implements RpcInvocationHandler { final Map> targetProxies; // Proxy of the active nn @@ -123,11 +126,18 @@ public RequestHedgingInvocationHandler( } executor = Executors.newFixedThreadPool(proxies.size()); completionService = new ExecutorCompletionService<>(executor); + // Set the callId and other informations from current thread. + final int callId = Client.getCallId(); + final int retryCount = Client.getRetryCount(); + final Object externalHandler = Client.getExternalHandler(); for (final Map.Entry> pEntry : targetProxies .entrySet()) { Callable c = new Callable() { @Override public Object call() throws Exception { + // Call Id and other informations from parent thread. + Client.setCallIdAndRetryCount(callId, retryCount, + externalHandler); LOG.trace("Invoking method {} on proxy {}", method, pEntry.getValue().proxyInfo); return method.invoke(pEntry.getValue().proxy, args); @@ -136,7 +146,9 @@ public Object call() throws Exception { proxyMap.put(completionService.submit(c), pEntry.getValue()); numAttempts++; } - + // Current thread's callId will not be cleared as RPC happens in + // separate threads. Reset the CallId information Forcefully. + Client.setCallIdAndRetryCountUnprotected(null, 0, null); Map badResults = new HashMap<>(); while (numAttempts > 0) { Future callResultFuture = completionService.take(); @@ -189,6 +201,18 @@ public Object call() throws Exception { throw unwrappedException; } } + + @Override + public void close() throws IOException { + } + + @Override + public ConnectionId getConnectionId() { + if (currentUsedProxy == null) { + return null; + } + return RPC.getConnectionIdForProxy(currentUsedProxy.proxy); + } } /** A proxy wrapping {@link RequestHedgingInvocationHandler}. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java index 346f79cee99..5e6cdf5eedc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java @@ -20,6 +20,7 @@ import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; +import java.lang.reflect.Proxy; import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.URI; @@ -34,6 +35,7 @@ import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.io.retry.MultiException; import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.ipc.RpcInvocationHandler; import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.test.GenericTestUtils; @@ -101,6 +103,8 @@ public long[] answer(InvocationOnMock invocation) throws Throwable { RequestHedgingProxyProvider provider = new RequestHedgingProxyProvider<>(conf, nnUri, ClientProtocol.class, createFactory(badMock, goodMock)); + Assert.assertTrue(Proxy.getInvocationHandler( + provider.getProxy().proxy) instanceof RpcInvocationHandler); long[] stats = provider.getProxy().proxy.getStats(); Assert.assertTrue(stats.length == 1); Mockito.verify(badMock).getStats(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAllowFormat.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAllowFormat.java index da4e71e3c5d..cfed8d23ce3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAllowFormat.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAllowFormat.java @@ -165,7 +165,8 @@ public void testFormatShouldBeIgnoredForNonFileBasedDirs() throws Exception { String localhost = "127.0.0.1"; InetSocketAddress nnAddr1 = new InetSocketAddress(localhost, 8020); InetSocketAddress nnAddr2 = new InetSocketAddress(localhost, 8020); - HATestUtil.setFailoverConfigurations(conf, logicalName, nnAddr1, nnAddr2); + HATestUtil.setFailoverConfigurations(conf, logicalName, null, + nnAddr1, nnAddr2); conf.set(DFS_NAMENODE_NAME_DIR_KEY, new File(DFS_BASE_DIR, "name").getAbsolutePath()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java index 307fe04618b..ca772e18167 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java @@ -17,13 +17,6 @@ */ package org.apache.hadoop.hdfs.server.namenode.ha; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; -import static org.apache.hadoop.hdfs.DFSUtil.createUri; - import java.io.IOException; import java.lang.reflect.Field; import java.lang.reflect.Proxy; @@ -36,11 +29,12 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.LongAccumulator; +import java.util.function.Supplier; import org.apache.hadoop.thirdparty.com.google.common.base.Joiner; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hdfs.ClientGSIContext; @@ -49,6 +43,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Failover; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; import org.apache.hadoop.hdfs.server.datanode.DataNode; @@ -59,47 +54,52 @@ import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Time; -import java.util.function.Supplier; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSUtil.createUri; /** * Static utility functions useful for testing HA. */ public abstract class HATestUtil { private static final Logger LOG = LoggerFactory.getLogger(HATestUtil.class); - + private static final String LOGICAL_HOSTNAME = "ha-nn-uri-%d"; - + /** * Trigger an edits log roll on the active and then wait for the standby to * catch up to all the edits done by the active. This method will check * repeatedly for up to NN_LAG_TIMEOUT milliseconds, and then fail throwing * {@link CouldNotCatchUpException} - * + * * @param active active NN * @param standby standby NN which should catch up to active * @throws IOException if an error occurs rolling the edit log * @throws CouldNotCatchUpException if the standby doesn't catch up to the * active in NN_LAG_TIMEOUT milliseconds */ - public static void waitForStandbyToCatchUp(NameNode active, - NameNode standby) throws InterruptedException, IOException, CouldNotCatchUpException { - long activeTxId = active.getNamesystem().getFSImage().getEditLog() - .getLastWrittenTxId(); + public static void waitForStandbyToCatchUp(NameNode active, NameNode standby) + throws InterruptedException, IOException, CouldNotCatchUpException { + long activeTxId = + active.getNamesystem().getFSImage().getEditLog().getLastWrittenTxId(); active.getRpcServer().rollEditLog(); long start = Time.now(); while (Time.now() - start < TestEditLogTailer.NN_LAG_TIMEOUT) { - long nn2HighestTxId = standby.getNamesystem().getFSImage() - .getLastAppliedTxId(); + long nn2HighestTxId = + standby.getNamesystem().getFSImage().getLastAppliedTxId(); if (nn2HighestTxId >= activeTxId) { return; } Thread.sleep(TestEditLogTailer.SLEEP_TIME); } - throw new CouldNotCatchUpException("Standby did not catch up to txid " + - activeTxId + " (currently at " + - standby.getNamesystem().getFSImage().getLastAppliedTxId() + ")"); + throw new CouldNotCatchUpException( + "Standby did not catch up to txid " + activeTxId + " (currently at " + + standby.getNamesystem().getFSImage().getLastAppliedTxId() + ")"); } /** @@ -119,7 +119,7 @@ public Boolean get() { return true; } }, 1000, 10000); - + } /** @@ -144,16 +144,18 @@ public CouldNotCatchUpException(String message) { super(message); } } - - /** Gets the filesystem instance by setting the failover configurations */ + + /** + * Gets the filesystem instance by setting the failover configurations. + */ public static DistributedFileSystem configureFailoverFs( MiniDFSCluster cluster, Configuration conf) throws IOException, URISyntaxException { return configureFailoverFs(cluster, conf, 0); } - /** - * Gets the filesystem instance by setting the failover configurations + /** + * Gets the filesystem instance by setting the failover configurations. * @param cluster the single process DFS cluster * @param conf cluster configuration * @param nsIndex namespace index starting with zero @@ -164,13 +166,13 @@ public static DistributedFileSystem configureFailoverFs( int nsIndex) throws IOException, URISyntaxException { conf = new Configuration(conf); String logicalName = getLogicalHostname(cluster); - setFailoverConfigurations(cluster, conf, logicalName, nsIndex); + setFailoverConfigurations(cluster, conf, logicalName, null, nsIndex); FileSystem fs = FileSystem.get(new URI("hdfs://" + logicalName), conf); return (DistributedFileSystem)fs; } public static

> - DistributedFileSystem configureObserverReadFs( + DistributedFileSystem configureObserverReadFs( MiniDFSCluster cluster, Configuration conf, Class

classFPP, boolean isObserverReadEnabled) throws IOException, URISyntaxException { @@ -246,8 +248,8 @@ public static MiniQJMHACluster setUpObserverCluster( return qjmhaCluster; } - public static

> - void setupHAConfiguration(MiniDFSCluster cluster, + public static

> void + setupHAConfiguration(MiniDFSCluster cluster, Configuration conf, int nsIndex, Class

classFPP) { MiniDFSCluster.NameNodeInfo[] nns = cluster.getNameNodeInfos(nsIndex); List nnAddresses = new ArrayList(); @@ -264,18 +266,23 @@ public static void setFailoverConfigurations(MiniDFSCluster cluster, Configuration conf) { setFailoverConfigurations(cluster, conf, getLogicalHostname(cluster)); } - + /** Sets the required configurations for performing failover of default namespace. */ public static void setFailoverConfigurations(MiniDFSCluster cluster, Configuration conf, String logicalName) { - setFailoverConfigurations(cluster, conf, logicalName, 0); + setFailoverConfigurations(cluster, conf, logicalName, null, 0); } - + /** Sets the required configurations for performing failover. */ public static void setFailoverConfigurations(MiniDFSCluster cluster, - Configuration conf, String logicalName, int nsIndex) { - setFailoverConfigurations(cluster, conf, logicalName, nsIndex, - ConfiguredFailoverProxyProvider.class); + Configuration conf, String logicalName, String proxyProvider, + int nsIndex) { + MiniDFSCluster.NameNodeInfo[] nns = cluster.getNameNodeInfos(nsIndex); + List nnAddresses = new ArrayList(3); + for (MiniDFSCluster.NameNodeInfo nn : nns) { + nnAddresses.add(nn.nameNode.getNameNodeAddress()); + } + setFailoverConfigurations(conf, logicalName, proxyProvider, nnAddresses); } /** Sets the required configurations for performing failover. */ @@ -290,19 +297,56 @@ public static void setFailoverConfigurations(MiniDFSCluster cluster, setFailoverConfigurations(conf, logicalName, nnAddresses, classFPP); } - public static void setFailoverConfigurations(Configuration conf, String logicalName, - InetSocketAddress ... nnAddresses){ - setFailoverConfigurations(conf, logicalName, Arrays.asList(nnAddresses), - ConfiguredFailoverProxyProvider.class); + public static void setFailoverConfigurations(Configuration conf, + String logicalName, String proxyProvider, + InetSocketAddress... nnAddresses) { + setFailoverConfigurations(conf, logicalName, proxyProvider, + Arrays.asList(nnAddresses)); } /** - * Sets the required configurations for performing failover + * Sets the required configurations for performing failover. + */ + public static void setFailoverConfigurations( + Configuration conf, String logicalName, + String proxyProvider, List nnAddresses) { + final List addresses = new ArrayList<>(); + nnAddresses.forEach(addr -> + addresses.add("hdfs://" + addr.getHostName() + ":" + addr.getPort())); + setFailoverConfigurations(conf, logicalName, proxyProvider, addresses); + } + + public static void setFailoverConfigurations( + Configuration conf, String logicalName, + String proxyProvider, Iterable nnAddresses) { + List nnids = new ArrayList(); + int i = 0; + for (String address : nnAddresses) { + String nnId = "nn" + (i + 1); + nnids.add(nnId); + conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, logicalName, nnId), address); + i++; + } + conf.set(DFSConfigKeys.DFS_NAMESERVICES, logicalName); + conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, logicalName), + Joiner.on(',').join(nnids)); + if (proxyProvider == null) { + conf.set(Failover.PROXY_PROVIDER_KEY_PREFIX + "." + logicalName, + ConfiguredFailoverProxyProvider.class.getName()); + } else { + conf.set(Failover.PROXY_PROVIDER_KEY_PREFIX + "." + logicalName, + proxyProvider); + } + conf.set("fs.defaultFS", "hdfs://" + logicalName); + } + + /** + * Sets the required configurations for performing failover. */ public static

> void setFailoverConfigurations(Configuration conf, String logicalName, List nnAddresses, Class

classFPP) { - final List addresses = new ArrayList(); + final List addresses = new ArrayList<>(); nnAddresses.forEach( addr -> addresses.add( "hdfs://" + addr.getHostName() + ":" + addr.getPort())); @@ -310,7 +354,7 @@ public static void setFailoverConfigurations(Configuration conf, String logicalN } public static

> - void setFailoverConfigurations( + void setFailoverConfigurations( Configuration conf, String logicalName, Iterable nnAddresses, Class

classFPP) { List nnids = new ArrayList(); @@ -332,13 +376,13 @@ void setFailoverConfigurations( public static String getLogicalHostname(MiniDFSCluster cluster) { return String.format(LOGICAL_HOSTNAME, cluster.getInstanceId()); } - + public static URI getLogicalUri(MiniDFSCluster cluster) throws URISyntaxException { return new URI(HdfsConstants.HDFS_URI_SCHEME + "://" + getLogicalHostname(cluster)); } - + public static void waitForCheckpoint(MiniDFSCluster cluster, int nnIdx, List txids) throws InterruptedException { long start = Time.now(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java index bd54ba2e989..168273117b5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java @@ -95,7 +95,7 @@ public void setupCluster() throws Exception { cluster.waitActive(); String logicalName = HATestUtil.getLogicalHostname(cluster); - HATestUtil.setFailoverConfigurations(cluster, conf, logicalName, 0); + HATestUtil.setFailoverConfigurations(cluster, conf, logicalName, null, 0); nn0 = cluster.getNameNode(0); nn1 = cluster.getNameNode(1); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHAFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHAFsck.java index 46ebb8f1042..c6e3d139e17 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHAFsck.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHAFsck.java @@ -17,11 +17,14 @@ */ package org.apache.hadoop.hdfs.server.namenode.ha; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - import java.io.ByteArrayOutputStream; import java.io.PrintStream; +import java.util.Arrays; + +import org.junit.Test; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.slf4j.event.Level; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -32,15 +35,30 @@ import org.apache.hadoop.hdfs.tools.DFSck; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.ToolRunner; -import org.slf4j.event.Level; -import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class TestHAFsck { static { GenericTestUtils.setLogLevel(DFSUtil.LOG, Level.TRACE); } - + + @Parameter + private String proxyProvider; + + public String getProxyProvider() { + return proxyProvider; + } + + @Parameterized.Parameters(name = "ProxyProvider: {0}") + public static Iterable data() { + return Arrays.asList(new Object[][] + {{ConfiguredFailoverProxyProvider.class.getName()}, + {RequestHedgingProxyProvider.class.getName()}}); + } + /** * Test that fsck still works with HA enabled. */ @@ -65,9 +83,9 @@ public void testHaFsck() throws Exception { cluster.transitionToActive(0); // Make sure conf has the relevant HA configs. - HATestUtil.setFailoverConfigurations(cluster, conf, "ha-nn-uri-0", 0); + HATestUtil.setFailoverConfigurations(cluster, conf, "ha-nn-uri-0", getProxyProvider(), 0); - fs = HATestUtil.configureFailoverFs(cluster, conf); + fs = FileSystem.get(conf); fs.mkdirs(new Path("/test1")); fs.mkdirs(new Path("/test2"));