HDFS-13989. RBF: Add FSCK to the Router (#1832)
Co-authored-by: Inigo Goiri <inigoiri@apache.org>
This commit is contained in:
parent
f09710bbb8
commit
0ddb5f0881
|
@ -0,0 +1,159 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.federation.router;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.PrintWriter;
|
||||
import java.net.InetAddress;
|
||||
import java.net.URL;
|
||||
import java.net.URLConnection;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Wrapper for the Router to offer the Namenode FSCK.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class RouterFsck {
|
||||
|
||||
public static final Logger LOG =
|
||||
LoggerFactory.getLogger(RouterFsck.class.getName());
|
||||
|
||||
private final Router router;
|
||||
private final InetAddress remoteAddress;
|
||||
private final PrintWriter out;
|
||||
private final Map<String, String[]> pmap;
|
||||
|
||||
public RouterFsck(Router router, Map<String, String[]> pmap,
|
||||
PrintWriter out, InetAddress remoteAddress) {
|
||||
this.router = router;
|
||||
this.remoteAddress = remoteAddress;
|
||||
this.out = out;
|
||||
this.pmap = pmap;
|
||||
}
|
||||
|
||||
public void fsck() {
|
||||
final long startTime = Time.monotonicNow();
|
||||
try {
|
||||
String msg = "Federated FSCK started by " +
|
||||
UserGroupInformation.getCurrentUser() + " from " + remoteAddress +
|
||||
" at " + new Date();
|
||||
LOG.info(msg);
|
||||
out.println(msg);
|
||||
|
||||
// Check each Namenode in the federation
|
||||
StateStoreService stateStore = router.getStateStore();
|
||||
MembershipStore membership =
|
||||
stateStore.getRegisteredRecordStore(MembershipStore.class);
|
||||
GetNamenodeRegistrationsRequest request =
|
||||
GetNamenodeRegistrationsRequest.newInstance();
|
||||
GetNamenodeRegistrationsResponse response =
|
||||
membership.getNamenodeRegistrations(request);
|
||||
List<MembershipState> memberships = response.getNamenodeMemberships();
|
||||
Collections.sort(memberships);
|
||||
for (MembershipState nn : memberships) {
|
||||
if (nn.getState() == FederationNamenodeServiceState.ACTIVE) {
|
||||
try {
|
||||
String webAddress = nn.getWebAddress();
|
||||
out.write("Checking " + nn + " at " + webAddress + "\n");
|
||||
remoteFsck(nn);
|
||||
} catch (IOException ioe) {
|
||||
out.println("Cannot query " + nn + ": " + ioe.getMessage() + "\n");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
out.println("Federated FSCK ended at " + new Date() + " in "
|
||||
+ (Time.monotonicNow() - startTime + " milliseconds"));
|
||||
} catch (Exception e) {
|
||||
String errMsg = "Fsck " + e.getMessage();
|
||||
LOG.warn(errMsg, e);
|
||||
out.println("Federated FSCK ended at " + new Date() + " in "
|
||||
+ (Time.monotonicNow() - startTime + " milliseconds"));
|
||||
out.println(e.getMessage());
|
||||
out.print("\n\n" + errMsg);
|
||||
} finally {
|
||||
out.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Perform FSCK in a remote Namenode.
|
||||
*
|
||||
* @param nn The state of the remote NameNode
|
||||
* @throws IOException Failed to fsck in a remote NameNode
|
||||
*/
|
||||
private void remoteFsck(MembershipState nn) throws IOException {
|
||||
final String scheme = nn.getWebScheme();
|
||||
final String webAddress = nn.getWebAddress();
|
||||
final String args = getURLArguments(pmap);
|
||||
final URL url = new URL(scheme + "://" + webAddress + "/fsck?" + args);
|
||||
|
||||
// Connect to the Namenode and output
|
||||
final URLConnection conn = url.openConnection();
|
||||
try (InputStream is = conn.getInputStream();
|
||||
InputStreamReader isr =
|
||||
new InputStreamReader(is, StandardCharsets.UTF_8);
|
||||
BufferedReader br = new BufferedReader(isr)) {
|
||||
String line;
|
||||
while ((line = br.readLine()) != null) {
|
||||
out.write(line + "\n");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the URL arguments from the query.
|
||||
*
|
||||
* @param map Original map of arguments.
|
||||
* @return Arguments ready to be attached to the URL.
|
||||
*/
|
||||
private static String getURLArguments(Map<String, String[]> map) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for (Entry<String, String[]> entry : map.entrySet()) {
|
||||
String key = entry.getKey();
|
||||
String[] value = entry.getValue();
|
||||
if (sb.length() > 0) {
|
||||
sb.append("&");
|
||||
}
|
||||
sb.append(key);
|
||||
sb.append("=");
|
||||
sb.append(value[0]);
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,81 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.federation.router;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.InetAddress;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.Map;
|
||||
|
||||
import javax.servlet.ServletContext;
|
||||
import javax.servlet.http.HttpServlet;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.server.common.JspHelper;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
||||
/**
|
||||
* This class is used in Namesystem's web server to do fsck on namenode.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class RouterFsckServlet extends HttpServlet {
|
||||
/** for java.io.Serializable. */
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
public static final String SERVLET_NAME = "fsck";
|
||||
public static final String PATH_SPEC = "/fsck";
|
||||
|
||||
/** Handle fsck request. */
|
||||
@Override
|
||||
public void doGet(HttpServletRequest request, HttpServletResponse response)
|
||||
throws IOException {
|
||||
final Map<String, String[]> pmap = request.getParameterMap();
|
||||
final PrintWriter out = response.getWriter();
|
||||
final InetAddress remoteAddress =
|
||||
InetAddress.getByName(request.getRemoteAddr());
|
||||
final ServletContext context = getServletContext();
|
||||
final Configuration conf = RouterHttpServer.getConfFromContext(context);
|
||||
final UserGroupInformation ugi = getUGI(request, conf);
|
||||
try {
|
||||
ugi.doAs((PrivilegedExceptionAction<Object>) () -> {
|
||||
Router router = RouterHttpServer.getRouterFromContext(context);
|
||||
new RouterFsck(router, pmap, out, remoteAddress).fsck();
|
||||
return null;
|
||||
});
|
||||
} catch (InterruptedException e) {
|
||||
response.sendError(HttpURLConnection.HTTP_BAD_REQUEST, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Copy from {@link org.apache.hadoop.hdfs.server.namenode.DfsServlet}.
|
||||
* @param request Http request from the user
|
||||
* @param conf configuration
|
||||
* @return ugi of the requested user
|
||||
* @throws IOException failed to get ugi
|
||||
*/
|
||||
protected UserGroupInformation getUGI(HttpServletRequest request,
|
||||
Configuration conf) throws IOException {
|
||||
return JspHelper.getUGI(getServletContext(), request, conf);
|
||||
}
|
||||
}
|
|
@ -118,10 +118,13 @@ public class RouterHttpServer extends AbstractService {
|
|||
|
||||
private static void setupServlets(
|
||||
HttpServer2 httpServer, Configuration conf) {
|
||||
// TODO Add servlets for FSCK, etc
|
||||
httpServer.addInternalServlet(IsRouterActiveServlet.SERVLET_NAME,
|
||||
IsRouterActiveServlet.PATH_SPEC,
|
||||
IsRouterActiveServlet.class);
|
||||
httpServer.addInternalServlet(RouterFsckServlet.SERVLET_NAME,
|
||||
RouterFsckServlet.PATH_SPEC,
|
||||
RouterFsckServlet.class,
|
||||
true);
|
||||
}
|
||||
|
||||
public InetSocketAddress getHttpAddress() {
|
||||
|
@ -132,6 +135,10 @@ public class RouterHttpServer extends AbstractService {
|
|||
return this.httpsAddress;
|
||||
}
|
||||
|
||||
static Configuration getConfFromContext(ServletContext context) {
|
||||
return (Configuration)context.getAttribute(JspHelper.CURRENT_CONF);
|
||||
}
|
||||
|
||||
public static Router getRouterFromContext(ServletContext context) {
|
||||
return (Router)context.getAttribute(NAMENODE_ATTRIBUTE_KEY);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,218 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.federation.router;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
|
||||
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
|
||||
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
|
||||
import org.apache.http.HttpStatus;
|
||||
import org.apache.http.client.methods.CloseableHttpResponse;
|
||||
import org.apache.http.client.methods.HttpGet;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
import org.apache.http.impl.client.HttpClients;
|
||||
import org.apache.http.util.EntityUtils;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
/**
|
||||
* End-to-end tests for fsck via DFSRouter.
|
||||
*/
|
||||
public class TestRouterFsck {
|
||||
|
||||
public static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestRouterFsck.class);
|
||||
|
||||
private static StateStoreDFSCluster cluster;
|
||||
private static MiniRouterDFSCluster.RouterContext routerContext;
|
||||
private static MountTableResolver mountTable;
|
||||
private static FileSystem routerFs;
|
||||
private static InetSocketAddress webAddress;
|
||||
private static List<MembershipState> memberships;
|
||||
|
||||
@BeforeClass
|
||||
public static void globalSetUp() throws Exception {
|
||||
// Build and start a federated cluster
|
||||
cluster = new StateStoreDFSCluster(false, 2);
|
||||
Configuration conf = new RouterConfigBuilder()
|
||||
.stateStore()
|
||||
.admin()
|
||||
.rpc()
|
||||
.http()
|
||||
.build();
|
||||
cluster.addRouterOverrides(conf);
|
||||
cluster.startCluster();
|
||||
cluster.startRouters();
|
||||
cluster.waitClusterUp();
|
||||
|
||||
// Get the end points
|
||||
routerContext = cluster.getRandomRouter();
|
||||
routerFs = routerContext.getFileSystem();
|
||||
Router router = routerContext.getRouter();
|
||||
mountTable = (MountTableResolver) router.getSubclusterResolver();
|
||||
webAddress = router.getHttpServerAddress();
|
||||
assertNotNull(webAddress);
|
||||
|
||||
StateStoreService stateStore = routerContext.getRouter().getStateStore();
|
||||
MembershipStore membership =
|
||||
stateStore.getRegisteredRecordStore(MembershipStore.class);
|
||||
GetNamenodeRegistrationsRequest request =
|
||||
GetNamenodeRegistrationsRequest.newInstance();
|
||||
GetNamenodeRegistrationsResponse response =
|
||||
membership.getNamenodeRegistrations(request);
|
||||
memberships = response.getNamenodeMemberships();
|
||||
Collections.sort(memberships);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() {
|
||||
if (cluster != null) {
|
||||
cluster.stopRouter(routerContext);
|
||||
cluster.shutdown();
|
||||
cluster = null;
|
||||
}
|
||||
}
|
||||
|
||||
@After
|
||||
public void clearMountTable() throws IOException {
|
||||
RouterClient client = routerContext.getAdminClient();
|
||||
MountTableManager mountTableManager = client.getMountTableManager();
|
||||
GetMountTableEntriesRequest req1 =
|
||||
GetMountTableEntriesRequest.newInstance("/");
|
||||
GetMountTableEntriesResponse response =
|
||||
mountTableManager.getMountTableEntries(req1);
|
||||
for (MountTable entry : response.getEntries()) {
|
||||
RemoveMountTableEntryRequest req2 =
|
||||
RemoveMountTableEntryRequest.newInstance(entry.getSourcePath());
|
||||
mountTableManager.removeMountTableEntry(req2);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean addMountTable(final MountTable entry) throws IOException {
|
||||
RouterClient client = routerContext.getAdminClient();
|
||||
MountTableManager mountTableManager = client.getMountTableManager();
|
||||
AddMountTableEntryRequest addRequest =
|
||||
AddMountTableEntryRequest.newInstance(entry);
|
||||
AddMountTableEntryResponse addResponse =
|
||||
mountTableManager.addMountTableEntry(addRequest);
|
||||
// Reload the Router cache
|
||||
mountTable.loadCache(true);
|
||||
return addResponse.getStatus();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFsck() throws Exception {
|
||||
MountTable addEntry = MountTable.newInstance("/testdir",
|
||||
Collections.singletonMap("ns0", "/testdir"));
|
||||
assertTrue(addMountTable(addEntry));
|
||||
addEntry = MountTable.newInstance("/testdir2",
|
||||
Collections.singletonMap("ns1", "/testdir2"));
|
||||
assertTrue(addMountTable(addEntry));
|
||||
// create 1 file on ns0
|
||||
routerFs.createNewFile(new Path("/testdir/testfile"));
|
||||
// create 3 files on ns1
|
||||
routerFs.createNewFile(new Path("/testdir2/testfile2"));
|
||||
routerFs.createNewFile(new Path("/testdir2/testfile3"));
|
||||
routerFs.createNewFile(new Path("/testdir2/testfile4"));
|
||||
|
||||
try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
|
||||
// TODO: support https
|
||||
HttpGet httpGet = new HttpGet("http://" + webAddress.getHostName() +
|
||||
":" + webAddress.getPort() + "/fsck");
|
||||
try (CloseableHttpResponse httpResponse = httpClient.execute(httpGet)) {
|
||||
assertEquals(HttpStatus.SC_OK,
|
||||
httpResponse.getStatusLine().getStatusCode());
|
||||
String out = EntityUtils.toString(
|
||||
httpResponse.getEntity(), StandardCharsets.UTF_8);
|
||||
LOG.info(out);
|
||||
assertTrue(out.contains("Federated FSCK started"));
|
||||
// assert 1 file exists in a cluster and 3 files exist
|
||||
// in another cluster
|
||||
assertTrue(out.contains("Total files:\t1"));
|
||||
assertTrue(out.contains("Total files:\t3"));
|
||||
assertTrue(out.contains("Federated FSCK ended"));
|
||||
int nnCount = 0;
|
||||
for (MembershipState nn : memberships) {
|
||||
if (nn.getState() == FederationNamenodeServiceState.ACTIVE) {
|
||||
assertTrue(out.contains(
|
||||
"Checking " + nn + " at " + nn.getWebAddress() + "\n"));
|
||||
nnCount++;
|
||||
}
|
||||
}
|
||||
assertEquals(2, nnCount);
|
||||
}
|
||||
|
||||
// check if the argument is passed correctly
|
||||
httpGet = new HttpGet("http://" + webAddress.getHostName() +
|
||||
":" + webAddress.getPort() + "/fsck?path=/testdir");
|
||||
try (CloseableHttpResponse httpResponse = httpClient.execute(httpGet)) {
|
||||
assertEquals(HttpStatus.SC_OK,
|
||||
httpResponse.getStatusLine().getStatusCode());
|
||||
String out = EntityUtils.toString(
|
||||
httpResponse.getEntity(), StandardCharsets.UTF_8);
|
||||
LOG.info(out);
|
||||
assertTrue(out.contains("Federated FSCK started"));
|
||||
assertTrue(out.contains("Total files:\t1"));
|
||||
// ns1 does not have files under /testdir
|
||||
assertFalse(out.contains("Total files:\t3"));
|
||||
assertTrue(out.contains("Federated FSCK ended"));
|
||||
int nnCount = 0;
|
||||
for (MembershipState nn : memberships) {
|
||||
if (nn.getState() == FederationNamenodeServiceState.ACTIVE) {
|
||||
assertTrue(out.contains(
|
||||
"Checking " + nn + " at " + nn.getWebAddress() + "\n"));
|
||||
nnCount++;
|
||||
}
|
||||
}
|
||||
assertEquals(2, nnCount);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -30,7 +30,7 @@ import org.apache.hadoop.security.UserGroupInformation;
|
|||
/**
|
||||
* A base class for the servlets in DFS.
|
||||
*/
|
||||
abstract class DfsServlet extends HttpServlet {
|
||||
public abstract class DfsServlet extends HttpServlet {
|
||||
/** For java.io.Serializable */
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
|
|
Loading…
Reference in New Issue