HDFS-15711. Add Metrics to HttpFS Server. (#2521) Contributed by Ahmed Hussein and Kihwal Lee
This commit is contained in:
parent
6258405f10
commit
777f426d6a
|
@ -18,6 +18,7 @@
|
|||
package org.apache.hadoop.fs.http.server;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.BlockStoragePolicySpi;
|
||||
import org.apache.hadoop.fs.ContentSummary;
|
||||
import org.apache.hadoop.fs.FileChecksum;
|
||||
|
@ -39,7 +40,6 @@ import org.apache.hadoop.fs.permission.FsPermission;
|
|||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.lib.service.FileSystemAccess;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.json.simple.JSONArray;
|
||||
|
@ -65,7 +65,22 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.HTTP_BUFFER_SIZE_DEFAULT;
|
|||
* FileSystem operation executors used by {@link HttpFSServer}.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class FSOperations {
|
||||
public final class FSOperations {
|
||||
|
||||
private static int bufferSize = 4096;
|
||||
|
||||
private FSOperations() {
|
||||
// not called
|
||||
}
|
||||
/**
|
||||
* Set the buffer size. The size is set during the initialization of
|
||||
* HttpFSServerWebApp.
|
||||
* @param conf the configuration to get the bufferSize
|
||||
*/
|
||||
public static void setBufferSize(Configuration conf) {
|
||||
bufferSize = conf.getInt(HTTPFS_BUFFER_SIZE_KEY,
|
||||
HTTP_BUFFER_SIZE_DEFAULT);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param fileStatus a FileStatus object
|
||||
|
@ -419,10 +434,9 @@ public class FSOperations {
|
|||
*/
|
||||
@Override
|
||||
public Void execute(FileSystem fs) throws IOException {
|
||||
int bufferSize = fs.getConf().getInt("httpfs.buffer.size", 4096);
|
||||
OutputStream os = fs.append(path, bufferSize);
|
||||
IOUtils.copyBytes(is, os, bufferSize, true);
|
||||
os.close();
|
||||
long bytes = copyBytes(is, os);
|
||||
HttpFSServerWebApp.get().getMetrics().incrBytesWritten(bytes);
|
||||
return null;
|
||||
}
|
||||
|
||||
|
@ -505,6 +519,7 @@ public class FSOperations {
|
|||
@Override
|
||||
public JSONObject execute(FileSystem fs) throws IOException {
|
||||
boolean result = fs.truncate(path, newLength);
|
||||
HttpFSServerWebApp.get().getMetrics().incrOpsTruncate();
|
||||
return toJSON(
|
||||
StringUtils.toLowerCase(HttpFSFileSystem.TRUNCATE_JSON), result);
|
||||
}
|
||||
|
@ -621,16 +636,65 @@ public class FSOperations {
|
|||
fsPermission = FsCreateModes.create(fsPermission,
|
||||
new FsPermission(unmaskedPermission));
|
||||
}
|
||||
int bufferSize = fs.getConf().getInt(HTTPFS_BUFFER_SIZE_KEY,
|
||||
HTTP_BUFFER_SIZE_DEFAULT);
|
||||
OutputStream os = fs.create(path, fsPermission, override, bufferSize, replication, blockSize, null);
|
||||
IOUtils.copyBytes(is, os, bufferSize, true);
|
||||
os.close();
|
||||
long bytes = copyBytes(is, os);
|
||||
HttpFSServerWebApp.get().getMetrics().incrBytesWritten(bytes);
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* These copyBytes methods combines the two different flavors used originally.
|
||||
* One with length and another one with buffer size.
|
||||
* In this impl, buffer size is determined internally, which is a singleton
|
||||
* normally set during initialization.
|
||||
* @param in the inputStream
|
||||
* @param out the outputStream
|
||||
* @return the totalBytes
|
||||
* @throws IOException the exception to be thrown.
|
||||
*/
|
||||
public static long copyBytes(InputStream in, OutputStream out)
|
||||
throws IOException {
|
||||
return copyBytes(in, out, Long.MAX_VALUE);
|
||||
}
|
||||
|
||||
public static long copyBytes(InputStream in, OutputStream out, long count)
|
||||
throws IOException {
|
||||
long totalBytes = 0;
|
||||
|
||||
// If bufferSize is not initialized use 4k. This will not happen
|
||||
// if all callers check and set it.
|
||||
byte[] buf = new byte[bufferSize];
|
||||
long bytesRemaining = count;
|
||||
int bytesRead;
|
||||
|
||||
try {
|
||||
while (bytesRemaining > 0) {
|
||||
int bytesToRead = (int)
|
||||
(bytesRemaining < buf.length ? bytesRemaining : buf.length);
|
||||
|
||||
bytesRead = in.read(buf, 0, bytesToRead);
|
||||
if (bytesRead == -1) {
|
||||
break;
|
||||
}
|
||||
|
||||
out.write(buf, 0, bytesRead);
|
||||
bytesRemaining -= bytesRead;
|
||||
totalBytes += bytesRead;
|
||||
}
|
||||
return totalBytes;
|
||||
} finally {
|
||||
// Originally IOUtils.copyBytes() were called with close=true. So we are
|
||||
// implementing the same behavior here.
|
||||
try {
|
||||
in.close();
|
||||
} finally {
|
||||
out.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Executor that performs a delete FileSystemAccess files system operation.
|
||||
*/
|
||||
|
@ -663,6 +727,7 @@ public class FSOperations {
|
|||
@Override
|
||||
public JSONObject execute(FileSystem fs) throws IOException {
|
||||
boolean deleted = fs.delete(path, recursive);
|
||||
HttpFSServerWebApp.get().getMetrics().incrOpsDelete();
|
||||
return toJSON(
|
||||
StringUtils.toLowerCase(HttpFSFileSystem.DELETE_JSON), deleted);
|
||||
}
|
||||
|
@ -731,6 +796,7 @@ public class FSOperations {
|
|||
@Override
|
||||
public Map execute(FileSystem fs) throws IOException {
|
||||
FileStatus status = fs.getFileStatus(path);
|
||||
HttpFSServerWebApp.get().getMetrics().incrOpsStat();
|
||||
return toJson(status);
|
||||
}
|
||||
|
||||
|
@ -759,7 +825,6 @@ public class FSOperations {
|
|||
json.put(HttpFSFileSystem.HOME_DIR_JSON, homeDir.toUri().getPath());
|
||||
return json;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -797,6 +862,7 @@ public class FSOperations {
|
|||
@Override
|
||||
public Map execute(FileSystem fs) throws IOException {
|
||||
FileStatus[] fileStatuses = fs.listStatus(path, filter);
|
||||
HttpFSServerWebApp.get().getMetrics().incrOpsListing();
|
||||
return toJson(fileStatuses, fs.getFileStatus(path).isFile());
|
||||
}
|
||||
|
||||
|
@ -888,6 +954,7 @@ public class FSOperations {
|
|||
new FsPermission(unmaskedPermission));
|
||||
}
|
||||
boolean mkdirs = fs.mkdirs(path, fsPermission);
|
||||
HttpFSServerWebApp.get().getMetrics().incrOpsMkdir();
|
||||
return toJSON(HttpFSFileSystem.MKDIRS_JSON, mkdirs);
|
||||
}
|
||||
|
||||
|
@ -920,8 +987,8 @@ public class FSOperations {
|
|||
*/
|
||||
@Override
|
||||
public InputStream execute(FileSystem fs) throws IOException {
|
||||
int bufferSize = HttpFSServerWebApp.get().getConfig().getInt(
|
||||
HTTPFS_BUFFER_SIZE_KEY, HTTP_BUFFER_SIZE_DEFAULT);
|
||||
// Only updating ops count. bytesRead is updated in InputStreamEntity
|
||||
HttpFSServerWebApp.get().getMetrics().incrOpsOpen();
|
||||
return fs.open(path, bufferSize);
|
||||
}
|
||||
|
||||
|
@ -959,6 +1026,7 @@ public class FSOperations {
|
|||
@Override
|
||||
public JSONObject execute(FileSystem fs) throws IOException {
|
||||
boolean renamed = fs.rename(path, toPath);
|
||||
HttpFSServerWebApp.get().getMetrics().incrOpsRename();
|
||||
return toJSON(HttpFSFileSystem.RENAME_JSON, renamed);
|
||||
}
|
||||
|
||||
|
|
|
@ -21,9 +21,13 @@ package org.apache.hadoop.fs.http.server;
|
|||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.fs.http.server.metrics.HttpFSServerMetrics;
|
||||
import org.apache.hadoop.lib.server.ServerException;
|
||||
import org.apache.hadoop.lib.service.FileSystemAccess;
|
||||
import org.apache.hadoop.lib.servlet.ServerWebApp;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.util.JvmPauseMonitor;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -56,6 +60,7 @@ public class HttpFSServerWebApp extends ServerWebApp {
|
|||
public static final String CONF_ADMIN_GROUP = "admin.group";
|
||||
|
||||
private static HttpFSServerWebApp SERVER;
|
||||
private static HttpFSServerMetrics metrics;
|
||||
|
||||
private String adminGroup;
|
||||
|
||||
|
@ -102,6 +107,7 @@ public class HttpFSServerWebApp extends ServerWebApp {
|
|||
LOG.info("Connects to Namenode [{}]",
|
||||
get().get(FileSystemAccess.class).getFileSystemConfiguration().
|
||||
get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY));
|
||||
setMetrics(getConfig());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -110,9 +116,22 @@ public class HttpFSServerWebApp extends ServerWebApp {
|
|||
@Override
|
||||
public void destroy() {
|
||||
SERVER = null;
|
||||
if (metrics != null) {
|
||||
metrics.shutdown();
|
||||
}
|
||||
super.destroy();
|
||||
}
|
||||
|
||||
private static void setMetrics(Configuration config) {
|
||||
LOG.info("Initializing HttpFSServerMetrics");
|
||||
metrics = HttpFSServerMetrics.create(config, "HttpFSServer");
|
||||
JvmPauseMonitor pauseMonitor = new JvmPauseMonitor();
|
||||
pauseMonitor.init(config);
|
||||
pauseMonitor.start();
|
||||
metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
|
||||
FSOperations.setBufferSize(config);
|
||||
DefaultMetricsSystem.initialize("HttpFSServer");
|
||||
}
|
||||
/**
|
||||
* Returns HttpFSServer server singleton, configuration and services are
|
||||
* accessible through it.
|
||||
|
@ -123,6 +142,14 @@ public class HttpFSServerWebApp extends ServerWebApp {
|
|||
return SERVER;
|
||||
}
|
||||
|
||||
/**
|
||||
* gets the HttpFSServerMetrics instance.
|
||||
* @return the HttpFSServerMetrics singleton.
|
||||
*/
|
||||
public static HttpFSServerMetrics getMetrics() {
|
||||
return metrics;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns HttpFSServer admin group.
|
||||
*
|
||||
|
|
|
@ -0,0 +1,163 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.fs.http.server.metrics;
|
||||
|
||||
import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||
import org.apache.hadoop.metrics2.annotation.Metric;
|
||||
import org.apache.hadoop.metrics2.annotation.Metrics;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
||||
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
||||
import org.apache.hadoop.metrics2.source.JvmMetrics;
|
||||
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
/**
|
||||
*
|
||||
* This class is for maintaining the various HttpFSServer statistics
|
||||
* and publishing them through the metrics interfaces.
|
||||
* This also registers the JMX MBean for RPC.
|
||||
* <p>
|
||||
* This class has a number of metrics variables that are publicly accessible;
|
||||
* these variables (objects) have methods to update their values;
|
||||
* for example:
|
||||
* <p> {@link #bytesRead}.inc()
|
||||
*
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@Metrics(about="HttpFSServer metrics", context="httpfs")
|
||||
public class HttpFSServerMetrics {
|
||||
|
||||
private @Metric MutableCounterLong bytesWritten;
|
||||
private @Metric MutableCounterLong bytesRead;
|
||||
|
||||
// Write ops
|
||||
private @Metric MutableCounterLong opsCreate;
|
||||
private @Metric MutableCounterLong opsAppend;
|
||||
private @Metric MutableCounterLong opsTruncate;
|
||||
private @Metric MutableCounterLong opsDelete;
|
||||
private @Metric MutableCounterLong opsRename;
|
||||
private @Metric MutableCounterLong opsMkdir;
|
||||
|
||||
// Read ops
|
||||
private @Metric MutableCounterLong opsOpen;
|
||||
private @Metric MutableCounterLong opsListing;
|
||||
private @Metric MutableCounterLong opsStat;
|
||||
private @Metric MutableCounterLong opsCheckAccess;
|
||||
|
||||
private final MetricsRegistry registry = new MetricsRegistry("httpfsserver");
|
||||
private final String name;
|
||||
private JvmMetrics jvmMetrics = null;
|
||||
|
||||
public HttpFSServerMetrics(String name, String sessionId,
|
||||
final JvmMetrics jvmMetrics) {
|
||||
this.name = name;
|
||||
this.jvmMetrics = jvmMetrics;
|
||||
registry.tag(SessionId, sessionId);
|
||||
}
|
||||
|
||||
public static HttpFSServerMetrics create(Configuration conf,
|
||||
String serverName) {
|
||||
String sessionId = conf.get(DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY);
|
||||
MetricsSystem ms = DefaultMetricsSystem.instance();
|
||||
JvmMetrics jm = JvmMetrics.create("HttpFSServer", sessionId, ms);
|
||||
String name = "ServerActivity-"+ (serverName.isEmpty()
|
||||
? "UndefinedServer"+ ThreadLocalRandom.current().nextInt()
|
||||
: serverName.replace(':', '-'));
|
||||
|
||||
return ms.register(name, null, new HttpFSServerMetrics(name,
|
||||
sessionId, jm));
|
||||
}
|
||||
|
||||
public String name() {
|
||||
return name;
|
||||
}
|
||||
|
||||
public JvmMetrics getJvmMetrics() {
|
||||
return jvmMetrics;
|
||||
}
|
||||
|
||||
public void incrBytesWritten(long bytes) {
|
||||
bytesWritten.incr(bytes);
|
||||
}
|
||||
|
||||
public void incrBytesRead(long bytes) {
|
||||
bytesRead.incr(bytes);
|
||||
}
|
||||
|
||||
public void incrOpsCreate() {
|
||||
opsCreate.incr();
|
||||
}
|
||||
|
||||
public void incrOpsAppend() {
|
||||
opsAppend.incr();
|
||||
}
|
||||
|
||||
public void incrOpsTruncate() {
|
||||
opsTruncate.incr();
|
||||
}
|
||||
|
||||
public void incrOpsDelete() {
|
||||
opsDelete.incr();
|
||||
}
|
||||
|
||||
public void incrOpsRename() {
|
||||
opsRename.incr();
|
||||
}
|
||||
|
||||
public void incrOpsMkdir() {
|
||||
opsMkdir.incr();
|
||||
}
|
||||
|
||||
public void incrOpsOpen() {
|
||||
opsOpen.incr();
|
||||
}
|
||||
|
||||
public void incrOpsListing() {
|
||||
opsListing.incr();
|
||||
}
|
||||
|
||||
public void incrOpsStat() {
|
||||
opsStat.incr();
|
||||
}
|
||||
|
||||
public void incrOpsCheckAccess() {
|
||||
opsCheckAccess.incr();
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
DefaultMetricsSystem.shutdown();
|
||||
}
|
||||
|
||||
public long getOpsMkdir() {
|
||||
return opsMkdir.value();
|
||||
}
|
||||
|
||||
public long getOpsListing() {
|
||||
return opsListing.value();
|
||||
}
|
||||
|
||||
public long getOpsStat() {
|
||||
return opsStat.value();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,27 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/**
|
||||
* A package to implement metrics for the HttpFS Server.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
package org.apache.hadoop.fs.http.server.metrics;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
|
@ -19,6 +19,9 @@
|
|||
package org.apache.hadoop.lib.wsrs;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.fs.http.server.FSOperations;
|
||||
import org.apache.hadoop.fs.http.server.HttpFSServerWebApp;
|
||||
import org.apache.hadoop.fs.http.server.metrics.HttpFSServerMetrics;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
|
||||
import javax.ws.rs.core.StreamingOutput;
|
||||
|
@ -45,10 +48,17 @@ public class InputStreamEntity implements StreamingOutput {
|
|||
@Override
|
||||
public void write(OutputStream os) throws IOException {
|
||||
IOUtils.skipFully(is, offset);
|
||||
long bytes = 0L;
|
||||
if (len == -1) {
|
||||
IOUtils.copyBytes(is, os, 4096, true);
|
||||
// Use the configured buffer size instead of hardcoding to 4k
|
||||
bytes = FSOperations.copyBytes(is, os);
|
||||
} else {
|
||||
IOUtils.copyBytes(is, os, len, true);
|
||||
bytes = FSOperations.copyBytes(is, os, len);
|
||||
}
|
||||
// Update metrics.
|
||||
HttpFSServerMetrics metrics = HttpFSServerWebApp.get().getMetrics();
|
||||
if (metrics != null) {
|
||||
metrics.incrBytesRead(bytes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -41,6 +41,7 @@ import java.net.URL;
|
|||
import java.text.MessageFormat;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -78,6 +79,7 @@ import org.eclipse.jetty.webapp.WebAppContext;
|
|||
|
||||
import com.google.common.collect.Maps;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.regex.Pattern;
|
||||
import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
|
||||
|
||||
|
@ -86,6 +88,23 @@ import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
|
|||
*/
|
||||
public class TestHttpFSServer extends HFSTestCase {
|
||||
|
||||
/**
|
||||
* define metric getters for unit tests.
|
||||
*/
|
||||
private static Callable<Long> defaultEntryMetricGetter = () -> 0L;
|
||||
private static Callable<Long> defaultExitMetricGetter = () -> 1L;
|
||||
private static HashMap<String, Callable<Long>> metricsGetter =
|
||||
new HashMap<String, Callable<Long>>() {
|
||||
{
|
||||
put("LISTSTATUS",
|
||||
() -> HttpFSServerWebApp.get().getMetrics().getOpsListing());
|
||||
put("MKDIRS",
|
||||
() -> HttpFSServerWebApp.get().getMetrics().getOpsMkdir());
|
||||
put("GETFILESTATUS",
|
||||
() -> HttpFSServerWebApp.get().getMetrics().getOpsStat());
|
||||
}
|
||||
};
|
||||
|
||||
@Test
|
||||
@TestDir
|
||||
@TestJetty
|
||||
|
@ -350,7 +369,8 @@ public class TestHttpFSServer extends HFSTestCase {
|
|||
@TestHdfs
|
||||
public void testHdfsAccess() throws Exception {
|
||||
createHttpFSServer(false, false);
|
||||
|
||||
long oldOpsListStatus =
|
||||
metricsGetter.get("LISTSTATUS").call();
|
||||
String user = HadoopUsersConfTestHelper.getHadoopUsers()[0];
|
||||
URL url = new URL(TestJettyHelper.getJettyURL(),
|
||||
MessageFormat.format("/webhdfs/v1/?user.name={0}&op=liststatus",
|
||||
|
@ -361,6 +381,8 @@ public class TestHttpFSServer extends HFSTestCase {
|
|||
new InputStreamReader(conn.getInputStream()));
|
||||
reader.readLine();
|
||||
reader.close();
|
||||
Assert.assertEquals(1 + oldOpsListStatus,
|
||||
(long) metricsGetter.get("LISTSTATUS").call());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -369,7 +391,8 @@ public class TestHttpFSServer extends HFSTestCase {
|
|||
@TestHdfs
|
||||
public void testMkdirs() throws Exception {
|
||||
createHttpFSServer(false, false);
|
||||
|
||||
long oldMkdirOpsStat =
|
||||
metricsGetter.get("MKDIRS").call();
|
||||
String user = HadoopUsersConfTestHelper.getHadoopUsers()[0];
|
||||
URL url = new URL(TestJettyHelper.getJettyURL(), MessageFormat.format(
|
||||
"/webhdfs/v1/tmp/sub-tmp?user.name={0}&op=MKDIRS", user));
|
||||
|
@ -377,8 +400,10 @@ public class TestHttpFSServer extends HFSTestCase {
|
|||
conn.setRequestMethod("PUT");
|
||||
conn.connect();
|
||||
Assert.assertEquals(conn.getResponseCode(), HttpURLConnection.HTTP_OK);
|
||||
|
||||
getStatus("/tmp/sub-tmp", "LISTSTATUS");
|
||||
long opsStat =
|
||||
metricsGetter.get("MKDIRS").call();
|
||||
Assert.assertEquals(1 + oldMkdirOpsStat, opsStat);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -387,7 +412,8 @@ public class TestHttpFSServer extends HFSTestCase {
|
|||
@TestHdfs
|
||||
public void testGlobFilter() throws Exception {
|
||||
createHttpFSServer(false, false);
|
||||
|
||||
long oldOpsListStatus =
|
||||
metricsGetter.get("LISTSTATUS").call();
|
||||
FileSystem fs = FileSystem.get(TestHdfsHelper.getHdfsConf());
|
||||
fs.mkdirs(new Path("/tmp"));
|
||||
fs.create(new Path("/tmp/foo.txt")).close();
|
||||
|
@ -402,6 +428,8 @@ public class TestHttpFSServer extends HFSTestCase {
|
|||
new InputStreamReader(conn.getInputStream()));
|
||||
reader.readLine();
|
||||
reader.close();
|
||||
Assert.assertEquals(1 + oldOpsListStatus,
|
||||
(long) metricsGetter.get("LISTSTATUS").call());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -461,6 +489,9 @@ public class TestHttpFSServer extends HFSTestCase {
|
|||
*/
|
||||
private void createDirWithHttp(String dirname, String perms,
|
||||
String unmaskedPerms) throws Exception {
|
||||
// get the createDirMetrics
|
||||
long oldOpsMkdir =
|
||||
metricsGetter.get("MKDIRS").call();
|
||||
String user = HadoopUsersConfTestHelper.getHadoopUsers()[0];
|
||||
// Remove leading / from filename
|
||||
if (dirname.charAt(0) == '/') {
|
||||
|
@ -484,6 +515,8 @@ public class TestHttpFSServer extends HFSTestCase {
|
|||
conn.setRequestMethod("PUT");
|
||||
conn.connect();
|
||||
Assert.assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
|
||||
Assert.assertEquals(1 + oldOpsMkdir,
|
||||
(long) metricsGetter.get("MKDIRS").call());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -497,6 +530,8 @@ public class TestHttpFSServer extends HFSTestCase {
|
|||
*/
|
||||
private String getStatus(String filename, String command)
|
||||
throws Exception {
|
||||
long oldOpsStat =
|
||||
metricsGetter.getOrDefault(command, defaultEntryMetricGetter).call();
|
||||
String user = HadoopUsersConfTestHelper.getHadoopUsers()[0];
|
||||
// Remove leading / from filename
|
||||
if (filename.charAt(0) == '/') {
|
||||
|
@ -512,7 +547,9 @@ public class TestHttpFSServer extends HFSTestCase {
|
|||
|
||||
BufferedReader reader =
|
||||
new BufferedReader(new InputStreamReader(conn.getInputStream()));
|
||||
|
||||
long opsStat =
|
||||
metricsGetter.getOrDefault(command, defaultExitMetricGetter).call();
|
||||
Assert.assertEquals(oldOpsStat + 1L, opsStat);
|
||||
return reader.readLine();
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue