HADOOP-6432. Add Statistics support in FileContext. Contributed by jitendra.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1066282 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jitendra Nath Pandey 2011-02-02 00:11:44 +00:00
parent f593f14bfb
commit 449478c832
6 changed files with 264 additions and 18 deletions

View File

@ -4,6 +4,8 @@ Trunk (unreleased changes)
INCOMPATIBLE CHANGES
HADOOP-6432. Add Statistics support in FileContext. (jitendra)
HADOOP-6904. Support method based RPC compatiblity. (hairong)
NEW FEATURES

View File

@ -24,7 +24,7 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.util.EnumSet;
import java.util.IdentityHashMap;
import java.util.HashMap;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.StringTokenizer;
@ -60,9 +60,8 @@ public abstract class AbstractFileSystem {
static final Log LOG = LogFactory.getLog(AbstractFileSystem.class);
/** Recording statistics per a file system class. */
private static final Map<Class<? extends AbstractFileSystem>, Statistics>
STATISTICS_TABLE =
new IdentityHashMap<Class<? extends AbstractFileSystem>, Statistics>();
private static final Map<URI, Statistics>
STATISTICS_TABLE = new HashMap<URI, Statistics>();
/** Cache of constructors for each file system class. */
private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE =
@ -144,36 +143,69 @@ public static AbstractFileSystem createFileSystem(URI uri, Configuration conf)
}
return (AbstractFileSystem) newInstance(clazz, uri, conf);
}
/**
* Get the statistics for a particular file system.
* @param cls the class to lookup
*
* @param uri
* used as key to lookup STATISTICS_TABLE. Only scheme and authority
* part of the uri are used.
* @return a statistics object
*/
public static synchronized Statistics getStatistics(String scheme,
Class<? extends AbstractFileSystem> cls) {
Statistics result = STATISTICS_TABLE.get(cls);
protected static synchronized Statistics getStatistics(URI uri) {
String scheme = uri.getScheme();
if (scheme == null) {
throw new IllegalArgumentException("Scheme not defined in the uri: "
+ uri);
}
URI baseUri = getBaseUri(uri);
Statistics result = STATISTICS_TABLE.get(baseUri);
if (result == null) {
result = new Statistics(scheme);
STATISTICS_TABLE.put(cls, result);
STATISTICS_TABLE.put(baseUri, result);
}
return result;
}
private static URI getBaseUri(URI uri) {
String scheme = uri.getScheme();
String authority = uri.getAuthority();
String baseUriString = scheme + "://";
if (authority != null) {
baseUriString = baseUriString + authority;
} else {
baseUriString = baseUriString + "/";
}
return URI.create(baseUriString);
}
public static synchronized void clearStatistics() {
for(Statistics stat: STATISTICS_TABLE.values()) {
stat.reset();
}
}
/**
* Prints statistics for all file systems.
*/
public static synchronized void printStatistics() {
for (Map.Entry<Class<? extends AbstractFileSystem>, Statistics> pair:
STATISTICS_TABLE.entrySet()) {
System.out.println(" FileSystem " + pair.getKey().getName() +
": " + pair.getValue());
for (Map.Entry<URI, Statistics> pair : STATISTICS_TABLE.entrySet()) {
System.out.println(" FileSystem " + pair.getKey().getScheme() + "://"
+ pair.getKey().getAuthority() + ": " + pair.getValue());
}
}
protected static synchronized Map<URI, Statistics> getAllStatistics() {
Map<URI, Statistics> statsMap = new HashMap<URI, Statistics>(
STATISTICS_TABLE.size());
for (Map.Entry<URI, Statistics> pair : STATISTICS_TABLE.entrySet()) {
URI key = pair.getKey();
Statistics value = pair.getValue();
Statistics newStatsObj = new Statistics(value);
statsMap.put(URI.create(key.toString()), newStatsObj);
}
return statsMap;
}
/**
* The main factory method for creating a file system. Get a file system for
@ -211,7 +243,7 @@ public AbstractFileSystem(final URI uri, final String supportedScheme,
final boolean authorityNeeded, final int defaultPort)
throws URISyntaxException {
myUri = getUri(uri, supportedScheme, authorityNeeded, defaultPort);
statistics = getStatistics(supportedScheme, getClass());
statistics = getStatistics(uri);
}
/**

View File

@ -39,6 +39,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.fs.Options.CreateOpts;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.permission.FsPermission;
@ -768,8 +769,8 @@ public Boolean next(final AbstractFileSystem fs, final Path p)
*
* @throws AccessControlException If access is denied
* @throws FileAlreadyExistsException If <code>dst</code> already exists and
* <code>options</options> has {@link Rename#OVERWRITE} option
* false.
* <code>options</options> has {@link Options.Rename#OVERWRITE}
* option false.
* @throws FileNotFoundException If <code>src</code> does not exist
* @throws ParentNotDirectoryException If parent of <code>dst</code> is not a
* directory
@ -2226,4 +2227,40 @@ public T resolve(final FileContext fc, Path p) throws IOException {
return in;
}
}
/**
* Get the statistics for a particular file system
*
* @param uri
* the uri to lookup the statistics. Only scheme and authority part
* of the uri are used as the key to store and lookup.
* @return a statistics object
*/
public static Statistics getStatistics(URI uri) {
return AbstractFileSystem.getStatistics(uri);
}
/**
* Clears all the statistics stored in AbstractFileSystem, for all the file
* systems.
*/
public static void clearStatistics() {
AbstractFileSystem.clearStatistics();
}
/**
* Prints the statistics to standard output. File System is identified by the
* scheme and authority.
*/
public static void printStatistics() {
AbstractFileSystem.printStatistics();
}
/**
* @return Map of uri and statistics for each filesystem instantiated. The uri
* consists of scheme and authority for the filesystem.
*/
public static Map<URI, Statistics> getAllStatistics() {
return AbstractFileSystem.getAllStatistics();
}
}

View File

@ -2007,6 +2007,18 @@ public Statistics(String scheme) {
this.scheme = scheme;
}
/**
* Copy constructor.
*
* @param st
* The input Statistics object which is cloned.
*/
public Statistics(Statistics st) {
this.scheme = st.scheme;
this.bytesRead = new AtomicLong(st.bytesRead.longValue());
this.bytesWritten = new AtomicLong(st.bytesWritten.longValue());
}
/**
* Increment the bytes read in the statistics
* @param newBytes the additional bytes read

View File

@ -0,0 +1,100 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.junit.Assert;
import org.junit.Test;
import static org.apache.hadoop.fs.FileContextTestHelper.*;
/**
* <p>
* Base class to test {@link FileContext} Statistics.
* </p>
*/
public abstract class FCStatisticsBaseTest {
static protected int blockSize = 512;
static protected int numBlocks = 1;
//fc should be set appropriately by the deriving test.
protected static FileContext fc = null;
@Test
public void testStatistics() throws IOException, URISyntaxException {
URI fsUri = getFsUri();
Statistics stats = FileContext.getStatistics(fsUri);
Assert.assertEquals(0, stats.getBytesRead());
Path filePath = getTestRootPath(fc, "file1");
createFile(fc, filePath, numBlocks, blockSize);
Assert.assertEquals(0, stats.getBytesRead());
verifyWrittenBytes(stats);
FSDataInputStream fstr = fc.open(filePath);
byte[] buf = new byte[blockSize];
int bytesRead = fstr.read(buf, 0, blockSize);
Assert.assertEquals(blockSize, bytesRead);
verifyReadBytes(stats);
verifyWrittenBytes(stats);
verifyReadBytes(FileContext.getStatistics(getFsUri()));
Map<URI, Statistics> statsMap = FileContext.getAllStatistics();
URI exactUri = getSchemeAuthorityUri();
verifyWrittenBytes(statsMap.get(exactUri));
fc.delete(filePath, true);
}
/**
* Bytes read may be different for different file systems. This method should
* throw assertion error if bytes read are incorrect.
*
* @param stats
*/
protected abstract void verifyReadBytes(Statistics stats);
/**
* Bytes written may be different for different file systems. This method should
* throw assertion error if bytes written are incorrect.
*
* @param stats
*/
protected abstract void verifyWrittenBytes(Statistics stats);
/**
* Returns the filesystem uri. Should be set
* @return URI
*/
protected abstract URI getFsUri();
protected URI getSchemeAuthorityUri() {
URI uri = getFsUri();
String SchemeAuthString = uri.getScheme() + "://";
if (uri.getAuthority() == null) {
SchemeAuthString += "/";
} else {
SchemeAuthString += uri.getAuthority();
}
return URI.create(SchemeAuthString);
}
}

View File

@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.net.URI;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import static org.apache.hadoop.fs.FileContextTestHelper.*;
/**
* <p>
* Tests the File Context Statistics for {@link LocalFileSystem}
* </p>
*/
public class TestLocalFsFCStatistics extends FCStatisticsBaseTest {
static final String LOCAL_FS_ROOT_URI = "file:///tmp/test";
@Before
public void setUp() throws Exception {
fc = FileContext.getLocalFSFileContext();
fc.mkdir(getTestRootPath(fc, "test"), FileContext.DEFAULT_PERM, true);
}
@After
public void tearDown() throws Exception {
fc.delete(getTestRootPath(fc, "test"), true);
}
protected void verifyReadBytes(Statistics stats) {
Assert.assertEquals(blockSize, stats.getBytesRead());
}
protected void verifyWrittenBytes(Statistics stats) {
//Extra 12 bytes are written apart from the block.
Assert.assertEquals(blockSize + 12, stats.getBytesWritten());
}
protected URI getFsUri() {
return URI.create(LOCAL_FS_ROOT_URI);
}
}