HDFS-15711. Add Metrics to HttpFS Server. (#2521) Contributed by Ahmed Hussein and Kihwal Lee
This commit is contained in:
parent
1570bddb79
commit
76316c4bc0
|
@ -18,6 +18,7 @@
|
||||||
package org.apache.hadoop.fs.http.server;
|
package org.apache.hadoop.fs.http.server;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.BlockStoragePolicySpi;
|
import org.apache.hadoop.fs.BlockStoragePolicySpi;
|
||||||
import org.apache.hadoop.fs.ContentSummary;
|
import org.apache.hadoop.fs.ContentSummary;
|
||||||
import org.apache.hadoop.fs.FileChecksum;
|
import org.apache.hadoop.fs.FileChecksum;
|
||||||
|
@ -43,7 +44,6 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||||
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
|
||||||
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
|
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
|
||||||
import org.apache.hadoop.hdfs.web.JsonUtil;
|
import org.apache.hadoop.hdfs.web.JsonUtil;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
|
||||||
import org.apache.hadoop.lib.service.FileSystemAccess;
|
import org.apache.hadoop.lib.service.FileSystemAccess;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.json.simple.JSONArray;
|
import org.json.simple.JSONArray;
|
||||||
|
@ -69,7 +69,22 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.HTTP_BUFFER_SIZE_DEFAULT;
|
||||||
* FileSystem operation executors used by {@link HttpFSServer}.
|
* FileSystem operation executors used by {@link HttpFSServer}.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class FSOperations {
|
public final class FSOperations {
|
||||||
|
|
||||||
|
private static int bufferSize = 4096;
|
||||||
|
|
||||||
|
private FSOperations() {
|
||||||
|
// not called
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Set the buffer size. The size is set during the initialization of
|
||||||
|
* HttpFSServerWebApp.
|
||||||
|
* @param conf the configuration to get the bufferSize
|
||||||
|
*/
|
||||||
|
public static void setBufferSize(Configuration conf) {
|
||||||
|
bufferSize = conf.getInt(HTTPFS_BUFFER_SIZE_KEY,
|
||||||
|
HTTP_BUFFER_SIZE_DEFAULT);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param fileStatus a FileStatus object
|
* @param fileStatus a FileStatus object
|
||||||
|
@ -423,10 +438,9 @@ public class FSOperations {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public Void execute(FileSystem fs) throws IOException {
|
public Void execute(FileSystem fs) throws IOException {
|
||||||
int bufferSize = fs.getConf().getInt("httpfs.buffer.size", 4096);
|
|
||||||
OutputStream os = fs.append(path, bufferSize);
|
OutputStream os = fs.append(path, bufferSize);
|
||||||
IOUtils.copyBytes(is, os, bufferSize, true);
|
long bytes = copyBytes(is, os);
|
||||||
os.close();
|
HttpFSServerWebApp.get().getMetrics().incrBytesWritten(bytes);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -509,6 +523,7 @@ public class FSOperations {
|
||||||
@Override
|
@Override
|
||||||
public JSONObject execute(FileSystem fs) throws IOException {
|
public JSONObject execute(FileSystem fs) throws IOException {
|
||||||
boolean result = fs.truncate(path, newLength);
|
boolean result = fs.truncate(path, newLength);
|
||||||
|
HttpFSServerWebApp.get().getMetrics().incrOpsTruncate();
|
||||||
return toJSON(
|
return toJSON(
|
||||||
StringUtils.toLowerCase(HttpFSFileSystem.TRUNCATE_JSON), result);
|
StringUtils.toLowerCase(HttpFSFileSystem.TRUNCATE_JSON), result);
|
||||||
}
|
}
|
||||||
|
@ -625,16 +640,65 @@ public class FSOperations {
|
||||||
fsPermission = FsCreateModes.create(fsPermission,
|
fsPermission = FsCreateModes.create(fsPermission,
|
||||||
new FsPermission(unmaskedPermission));
|
new FsPermission(unmaskedPermission));
|
||||||
}
|
}
|
||||||
int bufferSize = fs.getConf().getInt(HTTPFS_BUFFER_SIZE_KEY,
|
|
||||||
HTTP_BUFFER_SIZE_DEFAULT);
|
|
||||||
OutputStream os = fs.create(path, fsPermission, override, bufferSize, replication, blockSize, null);
|
OutputStream os = fs.create(path, fsPermission, override, bufferSize, replication, blockSize, null);
|
||||||
IOUtils.copyBytes(is, os, bufferSize, true);
|
long bytes = copyBytes(is, os);
|
||||||
os.close();
|
HttpFSServerWebApp.get().getMetrics().incrBytesWritten(bytes);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* These copyBytes methods combines the two different flavors used originally.
|
||||||
|
* One with length and another one with buffer size.
|
||||||
|
* In this impl, buffer size is determined internally, which is a singleton
|
||||||
|
* normally set during initialization.
|
||||||
|
* @param in the inputStream
|
||||||
|
* @param out the outputStream
|
||||||
|
* @return the totalBytes
|
||||||
|
* @throws IOException the exception to be thrown.
|
||||||
|
*/
|
||||||
|
public static long copyBytes(InputStream in, OutputStream out)
|
||||||
|
throws IOException {
|
||||||
|
return copyBytes(in, out, Long.MAX_VALUE);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static long copyBytes(InputStream in, OutputStream out, long count)
|
||||||
|
throws IOException {
|
||||||
|
long totalBytes = 0;
|
||||||
|
|
||||||
|
// If bufferSize is not initialized use 4k. This will not happen
|
||||||
|
// if all callers check and set it.
|
||||||
|
byte[] buf = new byte[bufferSize];
|
||||||
|
long bytesRemaining = count;
|
||||||
|
int bytesRead;
|
||||||
|
|
||||||
|
try {
|
||||||
|
while (bytesRemaining > 0) {
|
||||||
|
int bytesToRead = (int)
|
||||||
|
(bytesRemaining < buf.length ? bytesRemaining : buf.length);
|
||||||
|
|
||||||
|
bytesRead = in.read(buf, 0, bytesToRead);
|
||||||
|
if (bytesRead == -1) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
out.write(buf, 0, bytesRead);
|
||||||
|
bytesRemaining -= bytesRead;
|
||||||
|
totalBytes += bytesRead;
|
||||||
|
}
|
||||||
|
return totalBytes;
|
||||||
|
} finally {
|
||||||
|
// Originally IOUtils.copyBytes() were called with close=true. So we are
|
||||||
|
// implementing the same behavior here.
|
||||||
|
try {
|
||||||
|
in.close();
|
||||||
|
} finally {
|
||||||
|
out.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Executor that performs a delete FileSystemAccess files system operation.
|
* Executor that performs a delete FileSystemAccess files system operation.
|
||||||
*/
|
*/
|
||||||
|
@ -667,6 +731,7 @@ public class FSOperations {
|
||||||
@Override
|
@Override
|
||||||
public JSONObject execute(FileSystem fs) throws IOException {
|
public JSONObject execute(FileSystem fs) throws IOException {
|
||||||
boolean deleted = fs.delete(path, recursive);
|
boolean deleted = fs.delete(path, recursive);
|
||||||
|
HttpFSServerWebApp.get().getMetrics().incrOpsDelete();
|
||||||
return toJSON(
|
return toJSON(
|
||||||
StringUtils.toLowerCase(HttpFSFileSystem.DELETE_JSON), deleted);
|
StringUtils.toLowerCase(HttpFSFileSystem.DELETE_JSON), deleted);
|
||||||
}
|
}
|
||||||
|
@ -735,6 +800,7 @@ public class FSOperations {
|
||||||
@Override
|
@Override
|
||||||
public Map execute(FileSystem fs) throws IOException {
|
public Map execute(FileSystem fs) throws IOException {
|
||||||
FileStatus status = fs.getFileStatus(path);
|
FileStatus status = fs.getFileStatus(path);
|
||||||
|
HttpFSServerWebApp.get().getMetrics().incrOpsStat();
|
||||||
return toJson(status);
|
return toJson(status);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -763,7 +829,6 @@ public class FSOperations {
|
||||||
json.put(HttpFSFileSystem.HOME_DIR_JSON, homeDir.toUri().getPath());
|
json.put(HttpFSFileSystem.HOME_DIR_JSON, homeDir.toUri().getPath());
|
||||||
return json;
|
return json;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -801,6 +866,7 @@ public class FSOperations {
|
||||||
@Override
|
@Override
|
||||||
public Map execute(FileSystem fs) throws IOException {
|
public Map execute(FileSystem fs) throws IOException {
|
||||||
FileStatus[] fileStatuses = fs.listStatus(path, filter);
|
FileStatus[] fileStatuses = fs.listStatus(path, filter);
|
||||||
|
HttpFSServerWebApp.get().getMetrics().incrOpsListing();
|
||||||
return toJson(fileStatuses, fs.getFileStatus(path).isFile());
|
return toJson(fileStatuses, fs.getFileStatus(path).isFile());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -892,6 +958,7 @@ public class FSOperations {
|
||||||
new FsPermission(unmaskedPermission));
|
new FsPermission(unmaskedPermission));
|
||||||
}
|
}
|
||||||
boolean mkdirs = fs.mkdirs(path, fsPermission);
|
boolean mkdirs = fs.mkdirs(path, fsPermission);
|
||||||
|
HttpFSServerWebApp.get().getMetrics().incrOpsMkdir();
|
||||||
return toJSON(HttpFSFileSystem.MKDIRS_JSON, mkdirs);
|
return toJSON(HttpFSFileSystem.MKDIRS_JSON, mkdirs);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -924,8 +991,8 @@ public class FSOperations {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public InputStream execute(FileSystem fs) throws IOException {
|
public InputStream execute(FileSystem fs) throws IOException {
|
||||||
int bufferSize = HttpFSServerWebApp.get().getConfig().getInt(
|
// Only updating ops count. bytesRead is updated in InputStreamEntity
|
||||||
HTTPFS_BUFFER_SIZE_KEY, HTTP_BUFFER_SIZE_DEFAULT);
|
HttpFSServerWebApp.get().getMetrics().incrOpsOpen();
|
||||||
return fs.open(path, bufferSize);
|
return fs.open(path, bufferSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -963,6 +1030,7 @@ public class FSOperations {
|
||||||
@Override
|
@Override
|
||||||
public JSONObject execute(FileSystem fs) throws IOException {
|
public JSONObject execute(FileSystem fs) throws IOException {
|
||||||
boolean renamed = fs.rename(path, toPath);
|
boolean renamed = fs.rename(path, toPath);
|
||||||
|
HttpFSServerWebApp.get().getMetrics().incrOpsRename();
|
||||||
return toJSON(HttpFSFileSystem.RENAME_JSON, renamed);
|
return toJSON(HttpFSFileSystem.RENAME_JSON, renamed);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -21,9 +21,13 @@ package org.apache.hadoop.fs.http.server;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
|
import org.apache.hadoop.fs.http.server.metrics.HttpFSServerMetrics;
|
||||||
import org.apache.hadoop.lib.server.ServerException;
|
import org.apache.hadoop.lib.server.ServerException;
|
||||||
import org.apache.hadoop.lib.service.FileSystemAccess;
|
import org.apache.hadoop.lib.service.FileSystemAccess;
|
||||||
import org.apache.hadoop.lib.servlet.ServerWebApp;
|
import org.apache.hadoop.lib.servlet.ServerWebApp;
|
||||||
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
|
import org.apache.hadoop.util.JvmPauseMonitor;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -56,6 +60,7 @@ public class HttpFSServerWebApp extends ServerWebApp {
|
||||||
public static final String CONF_ADMIN_GROUP = "admin.group";
|
public static final String CONF_ADMIN_GROUP = "admin.group";
|
||||||
|
|
||||||
private static HttpFSServerWebApp SERVER;
|
private static HttpFSServerWebApp SERVER;
|
||||||
|
private static HttpFSServerMetrics metrics;
|
||||||
|
|
||||||
private String adminGroup;
|
private String adminGroup;
|
||||||
|
|
||||||
|
@ -102,6 +107,7 @@ public class HttpFSServerWebApp extends ServerWebApp {
|
||||||
LOG.info("Connects to Namenode [{}]",
|
LOG.info("Connects to Namenode [{}]",
|
||||||
get().get(FileSystemAccess.class).getFileSystemConfiguration().
|
get().get(FileSystemAccess.class).getFileSystemConfiguration().
|
||||||
get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY));
|
get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY));
|
||||||
|
setMetrics(getConfig());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -110,9 +116,22 @@ public class HttpFSServerWebApp extends ServerWebApp {
|
||||||
@Override
|
@Override
|
||||||
public void destroy() {
|
public void destroy() {
|
||||||
SERVER = null;
|
SERVER = null;
|
||||||
|
if (metrics != null) {
|
||||||
|
metrics.shutdown();
|
||||||
|
}
|
||||||
super.destroy();
|
super.destroy();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void setMetrics(Configuration config) {
|
||||||
|
LOG.info("Initializing HttpFSServerMetrics");
|
||||||
|
metrics = HttpFSServerMetrics.create(config, "HttpFSServer");
|
||||||
|
JvmPauseMonitor pauseMonitor = new JvmPauseMonitor();
|
||||||
|
pauseMonitor.init(config);
|
||||||
|
pauseMonitor.start();
|
||||||
|
metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
|
||||||
|
FSOperations.setBufferSize(config);
|
||||||
|
DefaultMetricsSystem.initialize("HttpFSServer");
|
||||||
|
}
|
||||||
/**
|
/**
|
||||||
* Returns HttpFSServer server singleton, configuration and services are
|
* Returns HttpFSServer server singleton, configuration and services are
|
||||||
* accessible through it.
|
* accessible through it.
|
||||||
|
@ -123,6 +142,14 @@ public class HttpFSServerWebApp extends ServerWebApp {
|
||||||
return SERVER;
|
return SERVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* gets the HttpFSServerMetrics instance.
|
||||||
|
* @return the HttpFSServerMetrics singleton.
|
||||||
|
*/
|
||||||
|
public static HttpFSServerMetrics getMetrics() {
|
||||||
|
return metrics;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns HttpFSServer admin group.
|
* Returns HttpFSServer admin group.
|
||||||
*
|
*
|
||||||
|
|
|
@ -0,0 +1,163 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.fs.http.server.metrics;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||||
|
import org.apache.hadoop.metrics2.annotation.Metric;
|
||||||
|
import org.apache.hadoop.metrics2.annotation.Metrics;
|
||||||
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
|
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
||||||
|
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
||||||
|
import org.apache.hadoop.metrics2.source.JvmMetrics;
|
||||||
|
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* This class is for maintaining the various HttpFSServer statistics
|
||||||
|
* and publishing them through the metrics interfaces.
|
||||||
|
* This also registers the JMX MBean for RPC.
|
||||||
|
* <p>
|
||||||
|
* This class has a number of metrics variables that are publicly accessible;
|
||||||
|
* these variables (objects) have methods to update their values;
|
||||||
|
* for example:
|
||||||
|
* <p> {@link #bytesRead}.inc()
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@Metrics(about="HttpFSServer metrics", context="httpfs")
|
||||||
|
public class HttpFSServerMetrics {
|
||||||
|
|
||||||
|
private @Metric MutableCounterLong bytesWritten;
|
||||||
|
private @Metric MutableCounterLong bytesRead;
|
||||||
|
|
||||||
|
// Write ops
|
||||||
|
private @Metric MutableCounterLong opsCreate;
|
||||||
|
private @Metric MutableCounterLong opsAppend;
|
||||||
|
private @Metric MutableCounterLong opsTruncate;
|
||||||
|
private @Metric MutableCounterLong opsDelete;
|
||||||
|
private @Metric MutableCounterLong opsRename;
|
||||||
|
private @Metric MutableCounterLong opsMkdir;
|
||||||
|
|
||||||
|
// Read ops
|
||||||
|
private @Metric MutableCounterLong opsOpen;
|
||||||
|
private @Metric MutableCounterLong opsListing;
|
||||||
|
private @Metric MutableCounterLong opsStat;
|
||||||
|
private @Metric MutableCounterLong opsCheckAccess;
|
||||||
|
|
||||||
|
private final MetricsRegistry registry = new MetricsRegistry("httpfsserver");
|
||||||
|
private final String name;
|
||||||
|
private JvmMetrics jvmMetrics = null;
|
||||||
|
|
||||||
|
public HttpFSServerMetrics(String name, String sessionId,
|
||||||
|
final JvmMetrics jvmMetrics) {
|
||||||
|
this.name = name;
|
||||||
|
this.jvmMetrics = jvmMetrics;
|
||||||
|
registry.tag(SessionId, sessionId);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static HttpFSServerMetrics create(Configuration conf,
|
||||||
|
String serverName) {
|
||||||
|
String sessionId = conf.get(DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY);
|
||||||
|
MetricsSystem ms = DefaultMetricsSystem.instance();
|
||||||
|
JvmMetrics jm = JvmMetrics.create("HttpFSServer", sessionId, ms);
|
||||||
|
String name = "ServerActivity-"+ (serverName.isEmpty()
|
||||||
|
? "UndefinedServer"+ ThreadLocalRandom.current().nextInt()
|
||||||
|
: serverName.replace(':', '-'));
|
||||||
|
|
||||||
|
return ms.register(name, null, new HttpFSServerMetrics(name,
|
||||||
|
sessionId, jm));
|
||||||
|
}
|
||||||
|
|
||||||
|
public String name() {
|
||||||
|
return name;
|
||||||
|
}
|
||||||
|
|
||||||
|
public JvmMetrics getJvmMetrics() {
|
||||||
|
return jvmMetrics;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incrBytesWritten(long bytes) {
|
||||||
|
bytesWritten.incr(bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incrBytesRead(long bytes) {
|
||||||
|
bytesRead.incr(bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incrOpsCreate() {
|
||||||
|
opsCreate.incr();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incrOpsAppend() {
|
||||||
|
opsAppend.incr();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incrOpsTruncate() {
|
||||||
|
opsTruncate.incr();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incrOpsDelete() {
|
||||||
|
opsDelete.incr();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incrOpsRename() {
|
||||||
|
opsRename.incr();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incrOpsMkdir() {
|
||||||
|
opsMkdir.incr();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incrOpsOpen() {
|
||||||
|
opsOpen.incr();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incrOpsListing() {
|
||||||
|
opsListing.incr();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incrOpsStat() {
|
||||||
|
opsStat.incr();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incrOpsCheckAccess() {
|
||||||
|
opsCheckAccess.incr();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void shutdown() {
|
||||||
|
DefaultMetricsSystem.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getOpsMkdir() {
|
||||||
|
return opsMkdir.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getOpsListing() {
|
||||||
|
return opsListing.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getOpsStat() {
|
||||||
|
return opsStat.value();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,27 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A package to implement metrics for the HttpFS Server.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
package org.apache.hadoop.fs.http.server.metrics;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
@ -19,6 +19,9 @@
|
||||||
package org.apache.hadoop.lib.wsrs;
|
package org.apache.hadoop.lib.wsrs;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.fs.http.server.FSOperations;
|
||||||
|
import org.apache.hadoop.fs.http.server.HttpFSServerWebApp;
|
||||||
|
import org.apache.hadoop.fs.http.server.metrics.HttpFSServerMetrics;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
|
||||||
import javax.ws.rs.core.StreamingOutput;
|
import javax.ws.rs.core.StreamingOutput;
|
||||||
|
@ -45,10 +48,17 @@ public class InputStreamEntity implements StreamingOutput {
|
||||||
@Override
|
@Override
|
||||||
public void write(OutputStream os) throws IOException {
|
public void write(OutputStream os) throws IOException {
|
||||||
IOUtils.skipFully(is, offset);
|
IOUtils.skipFully(is, offset);
|
||||||
|
long bytes = 0L;
|
||||||
if (len == -1) {
|
if (len == -1) {
|
||||||
IOUtils.copyBytes(is, os, 4096, true);
|
// Use the configured buffer size instead of hardcoding to 4k
|
||||||
|
bytes = FSOperations.copyBytes(is, os);
|
||||||
} else {
|
} else {
|
||||||
IOUtils.copyBytes(is, os, len, true);
|
bytes = FSOperations.copyBytes(is, os, len);
|
||||||
|
}
|
||||||
|
// Update metrics.
|
||||||
|
HttpFSServerMetrics metrics = HttpFSServerWebApp.get().getMetrics();
|
||||||
|
if (metrics != null) {
|
||||||
|
metrics.incrBytesRead(bytes);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,6 +45,7 @@ import java.nio.charset.Charset;
|
||||||
import java.text.MessageFormat;
|
import java.text.MessageFormat;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -85,6 +86,7 @@ import org.eclipse.jetty.webapp.WebAppContext;
|
||||||
|
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
import javax.ws.rs.HttpMethod;
|
import javax.ws.rs.HttpMethod;
|
||||||
|
@ -97,6 +99,23 @@ import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
|
||||||
*/
|
*/
|
||||||
public class TestHttpFSServer extends HFSTestCase {
|
public class TestHttpFSServer extends HFSTestCase {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* define metric getters for unit tests.
|
||||||
|
*/
|
||||||
|
private static Callable<Long> defaultEntryMetricGetter = () -> 0L;
|
||||||
|
private static Callable<Long> defaultExitMetricGetter = () -> 1L;
|
||||||
|
private static HashMap<String, Callable<Long>> metricsGetter =
|
||||||
|
new HashMap<String, Callable<Long>>() {
|
||||||
|
{
|
||||||
|
put("LISTSTATUS",
|
||||||
|
() -> HttpFSServerWebApp.get().getMetrics().getOpsListing());
|
||||||
|
put("MKDIRS",
|
||||||
|
() -> HttpFSServerWebApp.get().getMetrics().getOpsMkdir());
|
||||||
|
put("GETFILESTATUS",
|
||||||
|
() -> HttpFSServerWebApp.get().getMetrics().getOpsStat());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@TestDir
|
@TestDir
|
||||||
@TestJetty
|
@TestJetty
|
||||||
|
@ -361,7 +380,8 @@ public class TestHttpFSServer extends HFSTestCase {
|
||||||
@TestHdfs
|
@TestHdfs
|
||||||
public void testHdfsAccess() throws Exception {
|
public void testHdfsAccess() throws Exception {
|
||||||
createHttpFSServer(false, false);
|
createHttpFSServer(false, false);
|
||||||
|
long oldOpsListStatus =
|
||||||
|
metricsGetter.get("LISTSTATUS").call();
|
||||||
String user = HadoopUsersConfTestHelper.getHadoopUsers()[0];
|
String user = HadoopUsersConfTestHelper.getHadoopUsers()[0];
|
||||||
URL url = new URL(TestJettyHelper.getJettyURL(),
|
URL url = new URL(TestJettyHelper.getJettyURL(),
|
||||||
MessageFormat.format("/webhdfs/v1/?user.name={0}&op=liststatus",
|
MessageFormat.format("/webhdfs/v1/?user.name={0}&op=liststatus",
|
||||||
|
@ -372,6 +392,8 @@ public class TestHttpFSServer extends HFSTestCase {
|
||||||
new InputStreamReader(conn.getInputStream()));
|
new InputStreamReader(conn.getInputStream()));
|
||||||
reader.readLine();
|
reader.readLine();
|
||||||
reader.close();
|
reader.close();
|
||||||
|
Assert.assertEquals(1 + oldOpsListStatus,
|
||||||
|
(long) metricsGetter.get("LISTSTATUS").call());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -380,7 +402,8 @@ public class TestHttpFSServer extends HFSTestCase {
|
||||||
@TestHdfs
|
@TestHdfs
|
||||||
public void testMkdirs() throws Exception {
|
public void testMkdirs() throws Exception {
|
||||||
createHttpFSServer(false, false);
|
createHttpFSServer(false, false);
|
||||||
|
long oldMkdirOpsStat =
|
||||||
|
metricsGetter.get("MKDIRS").call();
|
||||||
String user = HadoopUsersConfTestHelper.getHadoopUsers()[0];
|
String user = HadoopUsersConfTestHelper.getHadoopUsers()[0];
|
||||||
URL url = new URL(TestJettyHelper.getJettyURL(), MessageFormat.format(
|
URL url = new URL(TestJettyHelper.getJettyURL(), MessageFormat.format(
|
||||||
"/webhdfs/v1/tmp/sub-tmp?user.name={0}&op=MKDIRS", user));
|
"/webhdfs/v1/tmp/sub-tmp?user.name={0}&op=MKDIRS", user));
|
||||||
|
@ -388,8 +411,10 @@ public class TestHttpFSServer extends HFSTestCase {
|
||||||
conn.setRequestMethod("PUT");
|
conn.setRequestMethod("PUT");
|
||||||
conn.connect();
|
conn.connect();
|
||||||
Assert.assertEquals(conn.getResponseCode(), HttpURLConnection.HTTP_OK);
|
Assert.assertEquals(conn.getResponseCode(), HttpURLConnection.HTTP_OK);
|
||||||
|
|
||||||
getStatus("/tmp/sub-tmp", "LISTSTATUS");
|
getStatus("/tmp/sub-tmp", "LISTSTATUS");
|
||||||
|
long opsStat =
|
||||||
|
metricsGetter.get("MKDIRS").call();
|
||||||
|
Assert.assertEquals(1 + oldMkdirOpsStat, opsStat);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -398,7 +423,8 @@ public class TestHttpFSServer extends HFSTestCase {
|
||||||
@TestHdfs
|
@TestHdfs
|
||||||
public void testGlobFilter() throws Exception {
|
public void testGlobFilter() throws Exception {
|
||||||
createHttpFSServer(false, false);
|
createHttpFSServer(false, false);
|
||||||
|
long oldOpsListStatus =
|
||||||
|
metricsGetter.get("LISTSTATUS").call();
|
||||||
FileSystem fs = FileSystem.get(TestHdfsHelper.getHdfsConf());
|
FileSystem fs = FileSystem.get(TestHdfsHelper.getHdfsConf());
|
||||||
fs.mkdirs(new Path("/tmp"));
|
fs.mkdirs(new Path("/tmp"));
|
||||||
fs.create(new Path("/tmp/foo.txt")).close();
|
fs.create(new Path("/tmp/foo.txt")).close();
|
||||||
|
@ -413,6 +439,8 @@ public class TestHttpFSServer extends HFSTestCase {
|
||||||
new InputStreamReader(conn.getInputStream()));
|
new InputStreamReader(conn.getInputStream()));
|
||||||
reader.readLine();
|
reader.readLine();
|
||||||
reader.close();
|
reader.close();
|
||||||
|
Assert.assertEquals(1 + oldOpsListStatus,
|
||||||
|
(long) metricsGetter.get("LISTSTATUS").call());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -472,6 +500,9 @@ public class TestHttpFSServer extends HFSTestCase {
|
||||||
*/
|
*/
|
||||||
private void createDirWithHttp(String dirname, String perms,
|
private void createDirWithHttp(String dirname, String perms,
|
||||||
String unmaskedPerms) throws Exception {
|
String unmaskedPerms) throws Exception {
|
||||||
|
// get the createDirMetrics
|
||||||
|
long oldOpsMkdir =
|
||||||
|
metricsGetter.get("MKDIRS").call();
|
||||||
String user = HadoopUsersConfTestHelper.getHadoopUsers()[0];
|
String user = HadoopUsersConfTestHelper.getHadoopUsers()[0];
|
||||||
// Remove leading / from filename
|
// Remove leading / from filename
|
||||||
if (dirname.charAt(0) == '/') {
|
if (dirname.charAt(0) == '/') {
|
||||||
|
@ -495,6 +526,8 @@ public class TestHttpFSServer extends HFSTestCase {
|
||||||
conn.setRequestMethod("PUT");
|
conn.setRequestMethod("PUT");
|
||||||
conn.connect();
|
conn.connect();
|
||||||
Assert.assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
|
Assert.assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
|
||||||
|
Assert.assertEquals(1 + oldOpsMkdir,
|
||||||
|
(long) metricsGetter.get("MKDIRS").call());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -508,6 +541,8 @@ public class TestHttpFSServer extends HFSTestCase {
|
||||||
*/
|
*/
|
||||||
private String getStatus(String filename, String command)
|
private String getStatus(String filename, String command)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
|
long oldOpsStat =
|
||||||
|
metricsGetter.getOrDefault(command, defaultEntryMetricGetter).call();
|
||||||
String user = HadoopUsersConfTestHelper.getHadoopUsers()[0];
|
String user = HadoopUsersConfTestHelper.getHadoopUsers()[0];
|
||||||
// Remove leading / from filename
|
// Remove leading / from filename
|
||||||
if (filename.charAt(0) == '/') {
|
if (filename.charAt(0) == '/') {
|
||||||
|
@ -523,7 +558,9 @@ public class TestHttpFSServer extends HFSTestCase {
|
||||||
|
|
||||||
BufferedReader reader =
|
BufferedReader reader =
|
||||||
new BufferedReader(new InputStreamReader(conn.getInputStream()));
|
new BufferedReader(new InputStreamReader(conn.getInputStream()));
|
||||||
|
long opsStat =
|
||||||
|
metricsGetter.getOrDefault(command, defaultExitMetricGetter).call();
|
||||||
|
Assert.assertEquals(oldOpsStat + 1L, opsStat);
|
||||||
return reader.readLine();
|
return reader.readLine();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue