Merge branch 'HDDS-4' into trunk

This commit is contained in:
Xiaoyu Yao 2019-01-18 14:40:21 -08:00
commit 751bc62df7
38 changed files with 889 additions and 224 deletions

View File

@ -14,7 +14,6 @@ Requirements:
Clang (community version), Clang (version for iOS 9 and later) (if compiling native code)
* openssl devel (if compiling native hadoop-pipes and to get the best HDFS encryption performance)
* Linux FUSE (Filesystem in Userspace) version 2.6 or above (if compiling fuse_dfs)
* Jansson C XML parsing library ( if compiling libwebhdfs )
* Doxygen ( if compiling libhdfspp and generating the documents )
* Internet connection for first build (to fetch all Maven and Hadoop dependencies)
* python (for releasedocs)
@ -75,8 +74,6 @@ Optional packages:
(OR https://github.com/01org/isa-l)
* Bzip2
$ sudo apt-get install bzip2 libbz2-dev
* Jansson (C Library for JSON)
$ sudo apt-get install libjansson-dev
* Linux FUSE
$ sudo apt-get install fuse libfuse-dev
* ZStandard compression

View File

@ -66,6 +66,8 @@ Java_org_apache_hadoop_io_erasurecode_rawcoder_NativeXORRawDecoder_decodeImpl(
numDataUnits + numParityUnits);
getOutputs(env, outputs, outputOffsets, xorDecoder->outputs, numParityUnits);
memset(xorDecoder->outputs[0], 0, chunkSize);
for (i = 0; i < numDataUnits + numParityUnits; i++) {
if (xorDecoder->inputs[i] == NULL) {
continue;

View File

@ -133,11 +133,20 @@ public class XceiverClientManager implements Closeable {
* Releases a XceiverClientSpi after use.
*
* @param client client to release
* @param invalidateClient if true, invalidates the client in cache
*/
public void releaseClient(XceiverClientSpi client) {
public void releaseClient(XceiverClientSpi client, boolean invalidateClient) {
Preconditions.checkNotNull(client);
synchronized (clientCache) {
client.decrementReference();
if (invalidateClient) {
Pipeline pipeline = client.getPipeline();
String key = pipeline.getId().getId().toString() + pipeline.getType();
XceiverClientSpi cachedClient = clientCache.getIfPresent(key);
if (cachedClient == client) {
clientCache.invalidate(key);
}
}
}
}

View File

@ -100,7 +100,7 @@ public class ContainerOperationClient implements ScmClient {
return containerWithPipeline;
} finally {
if (client != null) {
xceiverClientManager.releaseClient(client);
xceiverClientManager.releaseClient(client, false);
}
}
}
@ -191,7 +191,7 @@ public class ContainerOperationClient implements ScmClient {
return containerWithPipeline;
} finally {
if (client != null) {
xceiverClientManager.releaseClient(client);
xceiverClientManager.releaseClient(client, false);
}
}
}
@ -269,7 +269,7 @@ public class ContainerOperationClient implements ScmClient {
}
} finally {
if (client != null) {
xceiverClientManager.releaseClient(client);
xceiverClientManager.releaseClient(client, false);
}
}
}
@ -319,7 +319,7 @@ public class ContainerOperationClient implements ScmClient {
return response.getContainerData();
} finally {
if (client != null) {
xceiverClientManager.releaseClient(client);
xceiverClientManager.releaseClient(client, false);
}
}
}
@ -412,7 +412,7 @@ public class ContainerOperationClient implements ScmClient {
ObjectStageChangeRequestProto.Stage.complete);
} finally {
if (client != null) {
xceiverClientManager.releaseClient(client);
xceiverClientManager.releaseClient(client, false);
}
}
}

View File

@ -141,7 +141,7 @@ public class BlockInputStream extends InputStream implements Seekable {
@Override
public synchronized void close() {
if (xceiverClientManager != null && xceiverClient != null) {
xceiverClientManager.releaseClient(xceiverClient);
xceiverClientManager.releaseClient(xceiverClient, false);
xceiverClientManager = null;
xceiverClient = null;
}

View File

@ -21,6 +21,7 @@ import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.XceiverClientAsyncReply;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChecksumData;
import org.apache.hadoop.ozone.common.OzoneChecksumException;
@ -113,7 +114,7 @@ public class BlockOutputStream extends OutputStream {
* @param blockID block ID
* @param key chunk key
* @param xceiverClientManager client manager that controls client
* @param xceiverClient client to perform container calls
* @param pipeline pipeline where block will be written
* @param traceID container protocol call args
* @param chunkSize chunk size
* @param bufferList list of byte buffers
@ -124,10 +125,10 @@ public class BlockOutputStream extends OutputStream {
*/
@SuppressWarnings("parameternumber")
public BlockOutputStream(BlockID blockID, String key,
XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient,
XceiverClientManager xceiverClientManager, Pipeline pipeline,
String traceID, int chunkSize, long streamBufferFlushSize,
long streamBufferMaxSize, long watchTimeout,
List<ByteBuffer> bufferList, Checksum checksum) {
long streamBufferMaxSize, long watchTimeout, List<ByteBuffer> bufferList,
Checksum checksum) throws IOException {
this.blockID = blockID;
this.key = key;
this.traceID = traceID;
@ -138,7 +139,7 @@ public class BlockOutputStream extends OutputStream {
BlockData.newBuilder().setBlockID(blockID.getDatanodeBlockIDProtobuf())
.addMetadata(keyValue);
this.xceiverClientManager = xceiverClientManager;
this.xceiverClient = xceiverClient;
this.xceiverClient = xceiverClientManager.acquireClient(pipeline);
this.streamId = UUID.randomUUID().toString();
this.chunkIndex = 0;
this.streamBufferFlushSize = streamBufferFlushSize;
@ -500,7 +501,7 @@ public class BlockOutputStream extends OutputStream {
throw new IOException(
"Unexpected Storage Container Exception: " + e.toString(), e);
} finally {
cleanup();
cleanup(false);
}
}
// clear the currentBuffer
@ -541,9 +542,9 @@ public class BlockOutputStream extends OutputStream {
}
}
public void cleanup() {
public void cleanup(boolean invalidateClient) {
if (xceiverClientManager != null) {
xceiverClientManager.releaseClient(xceiverClient);
xceiverClientManager.releaseClient(xceiverClient, invalidateClient);
}
xceiverClientManager = null;
xceiverClient = null;

View File

@ -256,21 +256,8 @@ public final class ContainerDataYaml {
kvData.setMetadata(meta);
kvData.setChecksum((String) nodes.get(OzoneConsts.CHECKSUM));
String state = (String) nodes.get(OzoneConsts.STATE);
switch (state) {
case "OPEN":
kvData.setState(ContainerProtos.ContainerDataProto.State.OPEN);
break;
case "CLOSING":
kvData.setState(ContainerProtos.ContainerDataProto.State.CLOSING);
break;
case "CLOSED":
kvData.setState(ContainerProtos.ContainerDataProto.State.CLOSED);
break;
default:
throw new IllegalStateException("Unexpected " +
"ContainerLifeCycleState " + state + " for the containerId " +
nodes.get(OzoneConsts.CONTAINER_ID));
}
kvData
.setState(ContainerProtos.ContainerDataProto.State.valueOf(state));
return kvData;
}
}

View File

@ -96,8 +96,8 @@ public class ContainerReader implements Runnable {
try {
readVolume(hddsVolumeDir);
} catch (RuntimeException ex) {
LOG.info("Caught an Run time exception during reading container files" +
" from Volume {}", hddsVolumeDir);
LOG.error("Caught a Run time exception during reading container files" +
" from Volume {} {}", hddsVolumeDir, ex);
}
}

View File

@ -41,12 +41,15 @@ public class DFSOpsCountStatistics extends StorageStatistics {
/** This is for counting distributed file system operations. */
public enum OpType {
ADD_CACHE_DIRECTIVE("op_add_cache_directive"),
ADD_CACHE_POOL("op_add_cache_pool"),
ADD_EC_POLICY("op_add_ec_policy"),
ALLOW_SNAPSHOT("op_allow_snapshot"),
APPEND(CommonStatisticNames.OP_APPEND),
CONCAT("op_concat"),
COPY_FROM_LOCAL_FILE(CommonStatisticNames.OP_COPY_FROM_LOCAL_FILE),
CREATE(CommonStatisticNames.OP_CREATE),
CREATE_ENCRYPTION_ZONE("op_create_encryption_zone"),
CREATE_NON_RECURSIVE(CommonStatisticNames.OP_CREATE_NON_RECURSIVE),
CREATE_SNAPSHOT("op_create_snapshot"),
CREATE_SYM_LINK("op_create_symlink"),
@ -61,6 +64,7 @@ public class DFSOpsCountStatistics extends StorageStatistics {
GET_EC_CODECS("op_get_ec_codecs"),
GET_EC_POLICY("op_get_ec_policy"),
GET_EC_POLICIES("op_get_ec_policies"),
GET_ENCRYPTION_ZONE("op_get_encryption_zone"),
GET_FILE_BLOCK_LOCATIONS("op_get_file_block_locations"),
GET_FILE_CHECKSUM(CommonStatisticNames.OP_GET_FILE_CHECKSUM),
GET_FILE_LINK_STATUS("op_get_file_link_status"),
@ -72,8 +76,13 @@ public class DFSOpsCountStatistics extends StorageStatistics {
GET_STORAGE_POLICY("op_get_storage_policy"),
GET_TRASH_ROOT("op_get_trash_root"),
GET_XATTR("op_get_xattr"),
LIST_CACHE_DIRECTIVE("op_list_cache_directive"),
LIST_CACHE_POOL("op_list_cache_pool"),
LIST_ENCRYPTION_ZONE("op_list_encryption_zone"),
LIST_LOCATED_STATUS(CommonStatisticNames.OP_LIST_LOCATED_STATUS),
LIST_STATUS(CommonStatisticNames.OP_LIST_STATUS),
MODIFY_CACHE_POOL("op_modify_cache_pool"),
MODIFY_CACHE_DIRECTIVE("op_modify_cache_directive"),
MKDIRS(CommonStatisticNames.OP_MKDIRS),
MODIFY_ACL_ENTRIES(CommonStatisticNames.OP_MODIFY_ACL_ENTRIES),
OPEN(CommonStatisticNames.OP_OPEN),
@ -81,16 +90,21 @@ public class DFSOpsCountStatistics extends StorageStatistics {
PRIMITIVE_MKDIR("op_primitive_mkdir"),
REMOVE_ACL(CommonStatisticNames.OP_REMOVE_ACL),
REMOVE_ACL_ENTRIES(CommonStatisticNames.OP_REMOVE_ACL_ENTRIES),
REMOVE_CACHE_DIRECTIVE("op_remove_cache_directive"),
REMOVE_CACHE_POOL("op_remove_cache_pool"),
REMOVE_DEFAULT_ACL(CommonStatisticNames.OP_REMOVE_DEFAULT_ACL),
REMOVE_EC_POLICY("op_remove_ec_policy"),
REMOVE_XATTR("op_remove_xattr"),
RENAME(CommonStatisticNames.OP_RENAME),
RENAME_SNAPSHOT("op_rename_snapshot"),
RESOLVE_LINK("op_resolve_link"),
SATISFY_STORAGE_POLICY("op_satisfy_storagepolicy"),
SET_ACL(CommonStatisticNames.OP_SET_ACL),
SET_EC_POLICY("op_set_ec_policy"),
SET_OWNER(CommonStatisticNames.OP_SET_OWNER),
SET_PERMISSION(CommonStatisticNames.OP_SET_PERMISSION),
SET_QUOTA_BYTSTORAGEYPE("op_set_quota_bystoragetype"),
SET_QUOTA_USAGE("op_set_quota_usage"),
SET_REPLICATION("op_set_replication"),
SET_STORAGE_POLICY("op_set_storagePolicy"),
SET_TIMES(CommonStatisticNames.OP_SET_TIMES),

View File

@ -1002,6 +1002,8 @@ public class DistributedFileSystem extends FileSystem
*/
public void setQuota(Path src, final long namespaceQuota,
final long storagespaceQuota) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.SET_QUOTA_USAGE);
Path absF = fixRelativePart(src);
new FileSystemLinkResolver<Void>() {
@Override
@ -1030,6 +1032,8 @@ public class DistributedFileSystem extends FileSystem
public void setQuotaByStorageType(Path src, final StorageType type,
final long quota)
throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.SET_QUOTA_BYTSTORAGEYPE);
Path absF = fixRelativePart(src);
new FileSystemLinkResolver<Void>() {
@Override
@ -2222,6 +2226,8 @@ public class DistributedFileSystem extends FileSystem
*/
public long addCacheDirective(
CacheDirectiveInfo info, EnumSet<CacheFlag> flags) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.ADD_CACHE_DIRECTIVE);
Preconditions.checkNotNull(info.getPath());
Path path = new Path(getPathName(fixRelativePart(info.getPath()))).
makeQualified(getUri(), getWorkingDirectory());
@ -2249,6 +2255,8 @@ public class DistributedFileSystem extends FileSystem
*/
public void modifyCacheDirective(
CacheDirectiveInfo info, EnumSet<CacheFlag> flags) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.MODIFY_CACHE_DIRECTIVE);
if (info.getPath() != null) {
info = new CacheDirectiveInfo.Builder(info).
setPath(new Path(getPathName(fixRelativePart(info.getPath()))).
@ -2265,6 +2273,8 @@ public class DistributedFileSystem extends FileSystem
*/
public void removeCacheDirective(long id)
throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.REMOVE_CACHE_DIRECTIVE);
dfs.removeCacheDirective(id);
}
@ -2277,6 +2287,8 @@ public class DistributedFileSystem extends FileSystem
*/
public RemoteIterator<CacheDirectiveEntry> listCacheDirectives(
CacheDirectiveInfo filter) throws IOException {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.LIST_CACHE_DIRECTIVE);
if (filter == null) {
filter = new CacheDirectiveInfo.Builder().build();
}
@ -2317,6 +2329,8 @@ public class DistributedFileSystem extends FileSystem
* If the request could not be completed.
*/
public void addCachePool(CachePoolInfo info) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.ADD_CACHE_POOL);
CachePoolInfo.validate(info);
dfs.addCachePool(info);
}
@ -2330,6 +2344,8 @@ public class DistributedFileSystem extends FileSystem
* If the request could not be completed.
*/
public void modifyCachePool(CachePoolInfo info) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.MODIFY_CACHE_POOL);
CachePoolInfo.validate(info);
dfs.modifyCachePool(info);
}
@ -2343,6 +2359,8 @@ public class DistributedFileSystem extends FileSystem
* if the cache pool did not exist, or could not be removed.
*/
public void removeCachePool(String poolName) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.REMOVE_CACHE_POOL);
CachePoolInfo.validateName(poolName);
dfs.removeCachePool(poolName);
}
@ -2356,6 +2374,8 @@ public class DistributedFileSystem extends FileSystem
* If there was an error listing cache pools.
*/
public RemoteIterator<CachePoolEntry> listCachePools() throws IOException {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.LIST_CACHE_POOL);
return dfs.listCachePools();
}
@ -2497,6 +2517,8 @@ public class DistributedFileSystem extends FileSystem
/* HDFS only */
public void createEncryptionZone(final Path path, final String keyName)
throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.CREATE_ENCRYPTION_ZONE);
Path absF = fixRelativePart(path);
new FileSystemLinkResolver<Void>() {
@Override
@ -2524,6 +2546,8 @@ public class DistributedFileSystem extends FileSystem
/* HDFS only */
public EncryptionZone getEZForPath(final Path path)
throws IOException {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.GET_ENCRYPTION_ZONE);
Preconditions.checkNotNull(path);
Path absF = fixRelativePart(path);
return new FileSystemLinkResolver<EncryptionZone>() {
@ -2551,6 +2575,8 @@ public class DistributedFileSystem extends FileSystem
/* HDFS only */
public RemoteIterator<EncryptionZone> listEncryptionZones()
throws IOException {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.LIST_ENCRYPTION_ZONE);
return dfs.listEncryptionZones();
}
@ -2875,6 +2901,8 @@ public class DistributedFileSystem extends FileSystem
* @throws IOException
*/
public void satisfyStoragePolicy(final Path path) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.SATISFY_STORAGE_POLICY);
Path absF = fixRelativePart(path);
new FileSystemLinkResolver<Void>() {

View File

@ -37,6 +37,7 @@ import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.security.NoSuchAlgorithmException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collections;
@ -46,13 +47,17 @@ import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@ -78,10 +83,13 @@ import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.impl.LeaseRenewer;
import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
@ -775,6 +783,119 @@ public class TestDistributedFileSystem {
}
}
@Test
public void testStatistics2() throws IOException, NoSuchAlgorithmException {
HdfsConfiguration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.EXTERNAL.toString());
File tmpDir = GenericTestUtils.getTestDir(UUID.randomUUID().toString());
final Path jksPath = new Path(tmpDir.toString(), "test.jks");
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri());
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build()) {
cluster.waitActive();
final DistributedFileSystem dfs = cluster.getFileSystem();
Path dir = new Path("/testStat");
dfs.mkdirs(dir);
int readOps = 0;
int writeOps = 0;
FileSystem.clearStatistics();
// Quota Commands.
long opCount = getOpStatistics(OpType.SET_QUOTA_USAGE);
dfs.setQuota(dir, 100, 1000);
checkStatistics(dfs, readOps, ++writeOps, 0);
checkOpStatistics(OpType.SET_QUOTA_USAGE, opCount + 1);
opCount = getOpStatistics(OpType.SET_QUOTA_BYTSTORAGEYPE);
dfs.setQuotaByStorageType(dir, StorageType.DEFAULT, 2000);
checkStatistics(dfs, readOps, ++writeOps, 0);
checkOpStatistics(OpType.SET_QUOTA_BYTSTORAGEYPE, opCount + 1);
opCount = getOpStatistics(OpType.GET_QUOTA_USAGE);
dfs.getQuotaUsage(dir);
checkStatistics(dfs, ++readOps, writeOps, 0);
checkOpStatistics(OpType.GET_QUOTA_USAGE, opCount + 1);
// Satisfy Storage Policy.
opCount = getOpStatistics(OpType.SATISFY_STORAGE_POLICY);
dfs.satisfyStoragePolicy(dir);
checkStatistics(dfs, readOps, ++writeOps, 0);
checkOpStatistics(OpType.SATISFY_STORAGE_POLICY, opCount + 1);
// Cache Commands.
CachePoolInfo cacheInfo =
new CachePoolInfo("pool1").setMode(new FsPermission((short) 0));
opCount = getOpStatistics(OpType.ADD_CACHE_POOL);
dfs.addCachePool(cacheInfo);
checkStatistics(dfs, readOps, ++writeOps, 0);
checkOpStatistics(OpType.ADD_CACHE_POOL, opCount + 1);
CacheDirectiveInfo directive = new CacheDirectiveInfo.Builder()
.setPath(new Path(".")).setPool("pool1").build();
opCount = getOpStatistics(OpType.ADD_CACHE_DIRECTIVE);
long id = dfs.addCacheDirective(directive);
checkStatistics(dfs, readOps, ++writeOps, 0);
checkOpStatistics(OpType.ADD_CACHE_DIRECTIVE, opCount + 1);
opCount = getOpStatistics(OpType.LIST_CACHE_DIRECTIVE);
dfs.listCacheDirectives(null);
checkStatistics(dfs, ++readOps, writeOps, 0);
checkOpStatistics(OpType.LIST_CACHE_DIRECTIVE, opCount + 1);
opCount = getOpStatistics(OpType.MODIFY_CACHE_DIRECTIVE);
dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder().setId(id)
.setReplication((short) 2).build());
checkStatistics(dfs, readOps, ++writeOps, 0);
checkOpStatistics(OpType.MODIFY_CACHE_DIRECTIVE, opCount + 1);
opCount = getOpStatistics(OpType.REMOVE_CACHE_DIRECTIVE);
dfs.removeCacheDirective(id);
checkStatistics(dfs, readOps, ++writeOps, 0);
checkOpStatistics(OpType.REMOVE_CACHE_DIRECTIVE, opCount + 1);
opCount = getOpStatistics(OpType.MODIFY_CACHE_POOL);
dfs.modifyCachePool(cacheInfo);
checkStatistics(dfs, readOps, ++writeOps, 0);
checkOpStatistics(OpType.MODIFY_CACHE_POOL, opCount + 1);
opCount = getOpStatistics(OpType.LIST_CACHE_POOL);
dfs.listCachePools();
checkStatistics(dfs, ++readOps, writeOps, 0);
checkOpStatistics(OpType.LIST_CACHE_POOL, opCount + 1);
opCount = getOpStatistics(OpType.REMOVE_CACHE_POOL);
dfs.removeCachePool(cacheInfo.getPoolName());
checkStatistics(dfs, readOps, ++writeOps, 0);
checkOpStatistics(OpType.REMOVE_CACHE_POOL, opCount + 1);
// Crypto Commands.
final KeyProvider provider =
cluster.getNameNode().getNamesystem().getProvider();
final KeyProvider.Options options = KeyProvider.options(conf);
provider.createKey("key", options);
provider.flush();
opCount = getOpStatistics(OpType.CREATE_ENCRYPTION_ZONE);
dfs.createEncryptionZone(dir, "key");
checkStatistics(dfs, readOps, ++writeOps, 0);
checkOpStatistics(OpType.CREATE_ENCRYPTION_ZONE, opCount + 1);
opCount = getOpStatistics(OpType.LIST_ENCRYPTION_ZONE);
dfs.listEncryptionZones();
checkStatistics(dfs, ++readOps, writeOps, 0);
checkOpStatistics(OpType.LIST_ENCRYPTION_ZONE, opCount + 1);
opCount = getOpStatistics(OpType.GET_ENCRYPTION_ZONE);
dfs.getEZForPath(dir);
checkStatistics(dfs, ++readOps, writeOps, 0);
checkOpStatistics(OpType.GET_ENCRYPTION_ZONE, opCount + 1);
}
}
@Test
public void testECStatistics() throws IOException {
try (MiniDFSCluster cluster =

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.client.io;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.ozone.common.Checksum;
@ -40,7 +41,7 @@ public class BlockOutputStreamEntry extends OutputStream {
private BlockID blockID;
private final String key;
private final XceiverClientManager xceiverClientManager;
private final XceiverClientSpi xceiverClient;
private final Pipeline pipeline;
private final Checksum checksum;
private final String requestId;
private final int chunkSize;
@ -57,7 +58,7 @@ public class BlockOutputStreamEntry extends OutputStream {
private BlockOutputStreamEntry(BlockID blockID, String key,
XceiverClientManager xceiverClientManager,
XceiverClientSpi xceiverClient, String requestId, int chunkSize,
Pipeline pipeline, String requestId, int chunkSize,
long length, long streamBufferFlushSize, long streamBufferMaxSize,
long watchTimeout, List<ByteBuffer> bufferList, Checksum checksum,
Token<OzoneBlockTokenIdentifier> token) {
@ -65,7 +66,7 @@ public class BlockOutputStreamEntry extends OutputStream {
this.blockID = blockID;
this.key = key;
this.xceiverClientManager = xceiverClientManager;
this.xceiverClient = xceiverClient;
this.pipeline = pipeline;
this.requestId = requestId;
this.chunkSize = chunkSize;
this.token = token;
@ -78,31 +79,6 @@ public class BlockOutputStreamEntry extends OutputStream {
this.checksum = checksum;
}
/**
* For testing purpose, taking a some random created stream instance.
*
* @param outputStream a existing writable output stream
* @param length the length of data to write to the stream
*/
BlockOutputStreamEntry(OutputStream outputStream, long length,
Checksum checksum) {
this.outputStream = outputStream;
this.blockID = null;
this.key = null;
this.xceiverClientManager = null;
this.xceiverClient = null;
this.requestId = null;
this.chunkSize = -1;
this.token = null;
this.length = length;
this.currentPosition = 0;
streamBufferFlushSize = 0;
streamBufferMaxSize = 0;
bufferList = null;
watchTimeout = 0;
this.checksum = checksum;
}
long getLength() {
return length;
}
@ -115,6 +91,12 @@ public class BlockOutputStreamEntry extends OutputStream {
return length - currentPosition;
}
/**
* BlockOutputStream is initialized in this function. This makes sure that
* xceiverClient initialization is not done during preallocation and only
* done when data is written.
* @throws IOException if xceiverClient initialization fails
*/
private void checkStream() throws IOException {
if (this.outputStream == null) {
if (getToken() != null) {
@ -122,11 +104,12 @@ public class BlockOutputStreamEntry extends OutputStream {
}
this.outputStream =
new BlockOutputStream(blockID, key, xceiverClientManager,
xceiverClient, requestId, chunkSize, streamBufferFlushSize,
pipeline, requestId, chunkSize, streamBufferFlushSize,
streamBufferMaxSize, watchTimeout, bufferList, checksum);
}
}
@Override
public void write(int b) throws IOException {
checkStream();
@ -187,11 +170,11 @@ public class BlockOutputStreamEntry extends OutputStream {
throw new IOException("Invalid Output Stream for Key: " + key);
}
void cleanup() throws IOException{
void cleanup(boolean invalidateClient) throws IOException {
checkStream();
if (this.outputStream instanceof BlockOutputStream) {
BlockOutputStream out = (BlockOutputStream) this.outputStream;
out.cleanup();
out.cleanup(invalidateClient);
}
}
@ -214,7 +197,7 @@ public class BlockOutputStreamEntry extends OutputStream {
private BlockID blockID;
private String key;
private XceiverClientManager xceiverClientManager;
private XceiverClientSpi xceiverClient;
private Pipeline pipeline;
private String requestId;
private int chunkSize;
private long length;
@ -246,8 +229,8 @@ public class BlockOutputStreamEntry extends OutputStream {
return this;
}
public Builder setXceiverClient(XceiverClientSpi client) {
this.xceiverClient = client;
public Builder setPipeline(Pipeline pipeline) {
this.pipeline = pipeline;
return this;
}
@ -293,7 +276,7 @@ public class BlockOutputStreamEntry extends OutputStream {
public BlockOutputStreamEntry build() {
return new BlockOutputStreamEntry(blockID, key,
xceiverClientManager, xceiverClient, requestId, chunkSize,
xceiverClientManager, pipeline, requestId, chunkSize,
length, streamBufferFlushSize, streamBufferMaxSize, watchTimeout,
bufferList, checksum, token);
}
@ -315,8 +298,8 @@ public class BlockOutputStreamEntry extends OutputStream {
return xceiverClientManager;
}
public XceiverClientSpi getXceiverClient() {
return xceiverClient;
public Pipeline getPipeline() {
return pipeline;
}
public Checksum getChecksum() {

View File

@ -316,7 +316,7 @@ public class KeyInputStream extends InputStream implements Seekable {
omKeyLocationInfo.getLength());
} finally {
if (!success) {
xceiverClientManager.releaseClient(xceiverClient);
xceiverClientManager.releaseClient(xceiverClient, false);
}
}
}

View File

@ -21,22 +21,21 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.om.helpers.*;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.container.common.helpers
.StorageContainerException;
import org.apache.hadoop.hdds.scm.protocolPB
.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.ratis.protocol.AlreadyClosedException;
import org.apache.ratis.protocol.RaftRetryFailureException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -108,19 +107,6 @@ public class KeyOutputStream extends OutputStream {
this.checksum = new Checksum();
}
/**
* For testing purpose only. Not building output stream from blocks, but
* taking from externally.
*
* @param outputStream
* @param length
*/
@VisibleForTesting
public void addStream(OutputStream outputStream, long length) {
streamEntries.add(
new BlockOutputStreamEntry(outputStream, length, checksum));
}
@VisibleForTesting
public List<BlockOutputStreamEntry> getStreamEntries() {
return streamEntries;
@ -223,7 +209,7 @@ public class KeyOutputStream extends OutputStream {
.setBlockID(subKeyInfo.getBlockID())
.setKey(keyArgs.getKeyName())
.setXceiverClientManager(xceiverClientManager)
.setXceiverClient(xceiverClient)
.setPipeline(containerWithPipeline.getPipeline())
.setRequestId(requestID)
.setChunkSize(chunkSize)
.setLength(subKeyInfo.getLength())
@ -311,12 +297,14 @@ public class KeyOutputStream extends OutputStream {
current.write(b, off, writeLen);
}
} catch (IOException ioe) {
if (checkIfContainerIsClosed(ioe) || checkIfTimeoutException(ioe)) {
boolean retryFailure = checkForRetryFailure(ioe);
if (checkIfContainerIsClosed(ioe) || checkIfTimeoutException(ioe)
|| retryFailure) {
// for the current iteration, totalDataWritten - currentPos gives the
// amount of data already written to the buffer
writeLen = (int) (current.getWrittenDataLength() - currentPos);
LOG.debug("writeLen {}, total len {}", writeLen, len);
handleException(current, currentStreamIndex);
handleException(current, currentStreamIndex, retryFailure);
} else {
throw ioe;
}
@ -376,17 +364,19 @@ public class KeyOutputStream extends OutputStream {
*
* @param streamEntry StreamEntry
* @param streamIndex Index of the entry
* @param retryFailure if true the xceiverClient needs to be invalidated in
* the client cache.
* @throws IOException Throws IOException if Write fails
*/
private void handleException(BlockOutputStreamEntry streamEntry,
int streamIndex) throws IOException {
int streamIndex, boolean retryFailure) throws IOException {
long totalSuccessfulFlushedData =
streamEntry.getTotalSuccessfulFlushedData();
//set the correct length for the current stream
streamEntry.setCurrentPosition(totalSuccessfulFlushedData);
long bufferedDataLen = computeBufferData();
// just clean up the current stream.
streamEntry.cleanup();
streamEntry.cleanup(retryFailure);
if (bufferedDataLen > 0) {
// If the data is still cached in the underlying stream, we need to
// allocate new block and write this data in the datanode.
@ -404,7 +394,7 @@ public class KeyOutputStream extends OutputStream {
private boolean checkIfContainerIsClosed(IOException ioe) {
if (ioe.getCause() != null) {
return checkIfContainerNotOpenOrRaftRetryFailureException(ioe) || Optional
return checkForException(ioe, ContainerNotOpenException.class) || Optional
.of(ioe.getCause())
.filter(e -> e instanceof StorageContainerException)
.map(e -> (StorageContainerException) e)
@ -414,13 +404,23 @@ public class KeyOutputStream extends OutputStream {
return false;
}
private boolean checkIfContainerNotOpenOrRaftRetryFailureException(
IOException ioe) {
/**
* Checks if the provided exception signifies retry failure in ratis client.
* In case of retry failure, ratis client throws RaftRetryFailureException
* and all succeeding operations are failed with AlreadyClosedException.
*/
private boolean checkForRetryFailure(IOException ioe) {
return checkForException(ioe, RaftRetryFailureException.class,
AlreadyClosedException.class);
}
private boolean checkForException(IOException ioe, Class... classes) {
Throwable t = ioe.getCause();
while (t != null) {
if (t instanceof ContainerNotOpenException
|| t instanceof RaftRetryFailureException) {
return true;
for (Class cls : classes) {
if (cls.isInstance(t)) {
return true;
}
}
t = t.getCause();
}
@ -483,11 +483,13 @@ public class KeyOutputStream extends OutputStream {
entry.flush();
}
} catch (IOException ioe) {
if (checkIfContainerIsClosed(ioe) || checkIfTimeoutException(ioe)) {
boolean retryFailure = checkForRetryFailure(ioe);
if (checkIfContainerIsClosed(ioe) || checkIfTimeoutException(ioe)
|| retryFailure) {
// This call will allocate a new streamEntry and write the Data.
// Close needs to be retried on the newly allocated streamEntry as
// as well.
handleException(entry, streamIndex);
handleException(entry, streamIndex, retryFailure);
handleFlushOrClose(close);
} else {
throw ioe;

View File

@ -119,6 +119,6 @@ public class TestContainerStateMachineIdempotency {
} catch (IOException ioe) {
Assert.fail("Container operation failed" + ioe);
}
xceiverClientManager.releaseClient(client);
xceiverClientManager.releaseClient(client, false);
}
}

View File

@ -729,7 +729,7 @@ public abstract class TestOzoneRpcClientAbstract {
Assert.assertTrue(
e.getMessage().contains("on the pipeline " + pipeline.getId()));
}
manager.releaseClient(clientSpi);
manager.releaseClient(clientSpi, false);
}
private void readKey(OzoneBucket bucket, String keyName, String data)

View File

@ -98,7 +98,7 @@ public class TestContainerSmallFile {
ContainerProtocolCalls.readSmallFile(client, blockID, traceID);
String readData = response.getData().getData().toStringUtf8();
Assert.assertEquals("data123", readData);
xceiverClientManager.releaseClient(client);
xceiverClientManager.releaseClient(client, false);
}
@Test
@ -121,7 +121,7 @@ public class TestContainerSmallFile {
// Try to read a Key Container Name
ContainerProtos.GetSmallFileResponseProto response =
ContainerProtocolCalls.readSmallFile(client, blockID, traceID);
xceiverClientManager.releaseClient(client);
xceiverClientManager.releaseClient(client, false);
}
@Test
@ -149,7 +149,7 @@ public class TestContainerSmallFile {
ContainerProtocolCalls.readSmallFile(client,
ContainerTestHelper.getTestBlockID(
nonExistContainerID), traceID);
xceiverClientManager.releaseClient(client);
xceiverClientManager.releaseClient(client, false);
}
@Test
@ -202,7 +202,7 @@ public class TestContainerSmallFile {
ContainerProtocolCalls.readSmallFile(client, blockID1, traceID);
String readData = response.getData().getData().toStringUtf8();
Assert.assertEquals("data123", readData);
xceiverClientManager.releaseClient(client);
xceiverClientManager.releaseClient(client, false);
}
}

View File

@ -114,7 +114,7 @@ public class TestGetCommittedBlockLengthAndPutKey {
Assert.assertTrue(
BlockID.getFromProtobuf(response.getBlockID()).equals(blockID));
Assert.assertTrue(response.getBlockLength() == data.length);
xceiverClientManager.releaseClient(client);
xceiverClientManager.releaseClient(client, false);
}
@Test
@ -139,7 +139,7 @@ public class TestGetCommittedBlockLengthAndPutKey {
} catch (StorageContainerException sce) {
Assert.assertTrue(sce.getMessage().contains("Unable to find the block"));
}
xceiverClientManager.releaseClient(client);
xceiverClientManager.releaseClient(client, false);
}
@Test
@ -180,6 +180,6 @@ public class TestGetCommittedBlockLengthAndPutKey {
// This will also ensure that closing the container committed the block
// on the Datanodes.
Assert.assertEquals(responseBlockID, blockID);
xceiverClientManager.releaseClient(client);
xceiverClientManager.releaseClient(client, false);
}
}

View File

@ -103,9 +103,9 @@ public class TestXceiverClientManager {
Assert.assertEquals(2, client3.getRefcount());
Assert.assertEquals(2, client1.getRefcount());
Assert.assertEquals(client1, client3);
clientManager.releaseClient(client1);
clientManager.releaseClient(client2);
clientManager.releaseClient(client3);
clientManager.releaseClient(client1, false);
clientManager.releaseClient(client2, false);
clientManager.releaseClient(client3, false);
}
@Test
@ -150,7 +150,7 @@ public class TestXceiverClientManager {
// After releasing the client, this connection should be closed
// and any container operations should fail
clientManager.releaseClient(client1);
clientManager.releaseClient(client1, false);
String expectedMessage = "This channel is not connected.";
try {
@ -162,7 +162,7 @@ public class TestXceiverClientManager {
Assert.assertEquals(e.getClass(), IOException.class);
Assert.assertTrue(e.getMessage().contains(expectedMessage));
}
clientManager.releaseClient(client2);
clientManager.releaseClient(client2, false);
}
@Test
@ -184,7 +184,7 @@ public class TestXceiverClientManager {
.acquireClient(container1.getPipeline());
Assert.assertEquals(1, client1.getRefcount());
clientManager.releaseClient(client1);
clientManager.releaseClient(client1, false);
Assert.assertEquals(0, client1.getRefcount());
ContainerWithPipeline container2 = storageContainerLocationClient
@ -213,6 +213,44 @@ public class TestXceiverClientManager {
Assert.assertEquals(e.getClass(), IOException.class);
Assert.assertTrue(e.getMessage().contains(expectedMessage));
}
clientManager.releaseClient(client2);
clientManager.releaseClient(client2, false);
}
@Test
public void testFreeByRetryFailure() throws IOException {
OzoneConfiguration conf = new OzoneConfiguration();
conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 1);
XceiverClientManager clientManager = new XceiverClientManager(conf);
Cache<String, XceiverClientSpi> cache =
clientManager.getClientCache();
// client is added in cache
ContainerWithPipeline container1 = storageContainerLocationClient
.allocateContainer(clientManager.getType(), clientManager.getFactor(),
containerOwner);
XceiverClientSpi client1 =
clientManager.acquireClient(container1.getPipeline());
clientManager.acquireClient(container1.getPipeline());
Assert.assertEquals(2, client1.getRefcount());
// client should be invalidated in the cache
clientManager.releaseClient(client1, true);
Assert.assertEquals(1, client1.getRefcount());
Assert.assertNull(cache.getIfPresent(
container1.getContainerInfo().getPipelineID().getId().toString()
+ container1.getContainerInfo().getReplicationType()));
// new client should be added in cache
XceiverClientSpi client2 =
clientManager.acquireClient(container1.getPipeline());
Assert.assertNotEquals(client1, client2);
Assert.assertEquals(1, client2.getRefcount());
// on releasing the old client the entry in cache should not be invalidated
clientManager.releaseClient(client1, true);
Assert.assertEquals(0, client1.getRefcount());
Assert.assertNotNull(cache.getIfPresent(
container1.getContainerInfo().getPipelineID().getId().toString()
+ container1.getContainerInfo().getReplicationType()));
}
}

View File

@ -46,17 +46,16 @@ import org.apache.hadoop.ozone.s3.endpoint.MultiDeleteResponse.DeletedObject;
import org.apache.hadoop.ozone.s3.endpoint.MultiDeleteResponse.Error;
import org.apache.hadoop.ozone.s3.exception.OS3Exception;
import org.apache.hadoop.ozone.s3.exception.S3ErrorTable;
import org.apache.hadoop.ozone.s3.util.ContinueToken;
import org.apache.hadoop.ozone.s3.util.S3StorageType;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.ozone.s3.util.S3StorageType;
import org.apache.hadoop.ozone.s3.util.S3utils;
import static org.apache.hadoop.ozone.s3.util.S3Consts.ENCODING_TYPE;
import org.apache.http.HttpStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.ozone.s3.util.S3Consts.ENCODING_TYPE;
/**
* Bucket level rest endpoints.
*/
@ -104,16 +103,17 @@ public class BucketEndpoint extends EndpointBase {
Iterator<? extends OzoneKey> ozoneKeyIterator;
String decodedToken = S3utils.decodeContinueToken(continueToken);
ContinueToken decodedToken =
ContinueToken.decodeFromString(continueToken);
if (startAfter != null && continueToken != null) {
// If continuation token and start after both are provided, then we
// ignore start After
ozoneKeyIterator = bucket.listKeys(prefix, decodedToken);
ozoneKeyIterator = bucket.listKeys(prefix, decodedToken.getLastKey());
} else if (startAfter != null && continueToken == null) {
ozoneKeyIterator = bucket.listKeys(prefix, startAfter);
} else if (startAfter == null && continueToken != null){
ozoneKeyIterator = bucket.listKeys(prefix, decodedToken);
ozoneKeyIterator = bucket.listKeys(prefix, decodedToken.getLastKey());
} else {
ozoneKeyIterator = bucket.listKeys(prefix);
}
@ -130,6 +130,9 @@ public class BucketEndpoint extends EndpointBase {
response.setContinueToken(continueToken);
String prevDir = null;
if (continueToken != null) {
prevDir = decodedToken.getLastDir();
}
String lastKey = null;
int count = 0;
while (ozoneKeyIterator.hasNext()) {
@ -176,7 +179,8 @@ public class BucketEndpoint extends EndpointBase {
response.setTruncated(false);
} else if(ozoneKeyIterator.hasNext()) {
response.setTruncated(true);
response.setNextToken(S3utils.generateContinueToken(lastKey));
ContinueToken nextToken = new ContinueToken(lastKey, prevDir);
response.setNextToken(nextToken.encodeToString());
} else {
response.setTruncated(false);
}

View File

@ -60,8 +60,8 @@ import org.apache.hadoop.ozone.s3.exception.S3ErrorTable;
import org.apache.hadoop.ozone.s3.io.S3WrapperInputStream;
import org.apache.hadoop.ozone.s3.util.RFC1123Util;
import org.apache.hadoop.ozone.s3.util.RangeHeader;
import org.apache.hadoop.ozone.s3.util.RangeHeaderParserUtil;
import org.apache.hadoop.ozone.s3.util.S3StorageType;
import org.apache.hadoop.ozone.s3.util.S3utils;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.util.Time;
@ -210,7 +210,7 @@ public class ObjectEndpoint extends EndpointBase {
LOG.debug("range Header provided value is {}", rangeHeaderVal);
if (rangeHeaderVal != null) {
rangeHeader = S3utils.parseRangeHeader(rangeHeaderVal,
rangeHeader = RangeHeaderParserUtil.parseRangeHeader(rangeHeaderVal,
length);
LOG.debug("range Header provided value is {}", rangeHeader);
if (rangeHeader.isInValidRange()) {

View File

@ -0,0 +1,173 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.s3.util;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import org.apache.hadoop.ozone.s3.exception.OS3Exception;
import org.apache.hadoop.ozone.s3.exception.S3ErrorTable;
import com.google.common.base.Preconditions;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.codec.digest.DigestUtils;
/**
* Token which holds enough information to continue the key iteration.
*/
public class ContinueToken {
private String lastKey;
private String lastDir;
private static final String CONTINUE_TOKEN_SEPERATOR = "-";
public ContinueToken(String lastKey, String lastDir) {
Preconditions.checkNotNull(lastKey,
"The last key can't be null in the continue token.");
this.lastKey = lastKey;
if (lastDir != null && lastDir.length() > 0) {
this.lastDir = lastDir;
}
}
/**
* Generate a continuation token which is used in get Bucket.
*
* @return if key is not null return continuation token, else returns null.
*/
public String encodeToString() {
if (this.lastKey != null) {
ByteBuffer buffer = ByteBuffer
.allocate(4 + lastKey.length()
+ (lastDir == null ? 0 : lastDir.length()));
buffer.putInt(lastKey.length());
buffer.put(lastKey.getBytes(StandardCharsets.UTF_8));
if (lastDir != null) {
buffer.put(lastDir.getBytes(StandardCharsets.UTF_8));
}
String hex = Hex.encodeHexString(buffer.array());
String digest = DigestUtils.sha256Hex(hex);
return hex + CONTINUE_TOKEN_SEPERATOR + digest;
} else {
return null;
}
}
/**
* Decode a continuation token which is used in get Bucket.
*
* @param key
* @return if key is not null return decoded token, otherwise returns null.
* @throws OS3Exception
*/
public static ContinueToken decodeFromString(String key) throws OS3Exception {
if (key != null) {
int indexSeparator = key.indexOf(CONTINUE_TOKEN_SEPERATOR);
if (indexSeparator == -1) {
throw S3ErrorTable.newError(S3ErrorTable.INVALID_ARGUMENT, key);
}
String hex = key.substring(0, indexSeparator);
String digest = key.substring(indexSeparator + 1);
try {
checkHash(key, hex, digest);
ByteBuffer buffer = ByteBuffer.wrap(Hex.decodeHex(hex));
int keySize = buffer.getInt();
byte[] actualKeyBytes = new byte[keySize];
buffer.get(actualKeyBytes);
byte[] actualDirBytes = new byte[buffer.remaining()];
buffer.get(actualDirBytes);
return new ContinueToken(
new String(actualKeyBytes, StandardCharsets.UTF_8),
new String(actualDirBytes, StandardCharsets.UTF_8)
);
} catch (DecoderException ex) {
OS3Exception os3Exception = S3ErrorTable.newError(S3ErrorTable
.INVALID_ARGUMENT, key);
os3Exception.setErrorMessage("The continuation token provided is " +
"incorrect");
throw os3Exception;
}
} else {
return null;
}
}
private static void checkHash(String key, String hex, String digest)
throws OS3Exception {
String digestActualKey = DigestUtils.sha256Hex(hex);
if (!digest.equals(digestActualKey)) {
OS3Exception ex = S3ErrorTable.newError(S3ErrorTable
.INVALID_ARGUMENT, key);
ex.setErrorMessage("The continuation token provided is incorrect");
throw ex;
}
}
public String getLastKey() {
return lastKey;
}
public void setLastKey(String lastKey) {
this.lastKey = lastKey;
}
public String getLastDir() {
return lastDir;
}
public void setLastDir(String lastDir) {
this.lastDir = lastDir;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ContinueToken that = (ContinueToken) o;
return lastKey.equals(that.lastKey) &&
Objects.equals(lastDir, that.lastDir);
}
@Override
public int hashCode() {
return Objects.hash(lastKey);
}
@Override
public String toString() {
return "ContinueToken{" +
"lastKey='" + lastKey + '\'' +
", lastDir='" + lastDir + '\'' +
'}';
}
}

View File

@ -18,83 +18,19 @@
*/
package org.apache.hadoop.ozone.s3.util;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ozone.s3.exception.OS3Exception;
import org.apache.hadoop.ozone.s3.exception.S3ErrorTable;
import java.nio.charset.StandardCharsets;
import java.util.regex.Matcher;
import static org.apache.hadoop.ozone.s3.util.S3Consts
.RANGE_HEADER_MATCH_PATTERN;
import org.apache.hadoop.classification.InterfaceAudience;
import static org.apache.hadoop.ozone.s3.util.S3Consts.RANGE_HEADER_MATCH_PATTERN;
/**
* Utility class for S3.
*/
@InterfaceAudience.Private
public final class S3utils {
private S3utils() {
public final class RangeHeaderParserUtil {
private RangeHeaderParserUtil() {
}
private static final String CONTINUE_TOKEN_SEPERATOR = "-";
/**
* Generate a continuation token which is used in get Bucket.
* @param key
* @return if key is not null return continuation token, else returns null.
*/
public static String generateContinueToken(String key) {
if (key != null) {
byte[] byteData = key.getBytes(StandardCharsets.UTF_8);
String hex = Hex.encodeHexString(byteData);
String digest = DigestUtils.sha256Hex(key);
return hex + CONTINUE_TOKEN_SEPERATOR + digest;
} else {
return null;
}
}
/**
* Decode a continuation token which is used in get Bucket.
* @param key
* @return if key is not null return decoded token, otherwise returns null.
* @throws OS3Exception
*/
public static String decodeContinueToken(String key) throws OS3Exception {
if (key != null) {
int indexSeparator = key.indexOf(CONTINUE_TOKEN_SEPERATOR);
if (indexSeparator == -1) {
throw S3ErrorTable.newError(S3ErrorTable.INVALID_ARGUMENT, key);
}
String hex = key.substring(0, indexSeparator);
String digest = key.substring(indexSeparator + 1);
try {
byte[] actualKeyBytes = Hex.decodeHex(hex);
String digestActualKey = DigestUtils.sha256Hex(actualKeyBytes);
if (digest.equals(digestActualKey)) {
return new String(actualKeyBytes, StandardCharsets.UTF_8);
} else {
OS3Exception ex = S3ErrorTable.newError(S3ErrorTable
.INVALID_ARGUMENT, key);
ex.setErrorMessage("The continuation token provided is incorrect");
throw ex;
}
} catch (DecoderException ex) {
OS3Exception os3Exception = S3ErrorTable.newError(S3ErrorTable
.INVALID_ARGUMENT, key);
os3Exception.setErrorMessage("The continuation token provided is " +
"incorrect");
throw os3Exception;
}
} else {
return null;
}
}
/**
* Parse the rangeHeader and set the start and end offset.

View File

@ -211,6 +211,53 @@ public class TestBucketGet {
}
@Test
public void listWithContinuationTokenDirBreak()
throws OS3Exception, IOException {
BucketEndpoint getBucket = new BucketEndpoint();
OzoneClient ozoneClient =
createClientWithKeys(
"test/dir1/file1",
"test/dir1/file2",
"test/dir1/file3",
"test/dir2/file4",
"test/dir2/file5",
"test/dir2/file6",
"test/dir3/file7",
"test/file8");
getBucket.setClient(ozoneClient);
int maxKeys = 2;
ListObjectResponse getBucketResponse;
getBucketResponse =
(ListObjectResponse) getBucket.list("b1", "/", null, null, maxKeys,
"test/", null, null, null, null).getEntity();
Assert.assertEquals(0, getBucketResponse.getContents().size());
Assert.assertEquals(2, getBucketResponse.getCommonPrefixes().size());
Assert.assertEquals("test/dir1/",
getBucketResponse.getCommonPrefixes().get(0).getPrefix());
Assert.assertEquals("test/dir2/",
getBucketResponse.getCommonPrefixes().get(1).getPrefix());
getBucketResponse =
(ListObjectResponse) getBucket.list("b1", "/", null, null, maxKeys,
"test/", null, getBucketResponse.getNextToken(), null, null)
.getEntity();
Assert.assertEquals(1, getBucketResponse.getContents().size());
Assert.assertEquals(1, getBucketResponse.getCommonPrefixes().size());
Assert.assertEquals("test/dir3/",
getBucketResponse.getCommonPrefixes().get(0).getPrefix());
Assert.assertEquals("test/file8",
getBucketResponse.getContents().get(0).getKey());
}
@Test
/**
* This test is with prefix and delimiter and verify continuation-token
@ -237,7 +284,6 @@ public class TestBucketGet {
Assert.assertTrue(getBucketResponse.isTruncated());
Assert.assertTrue(getBucketResponse.getCommonPrefixes().size() == 2);
// 2nd time
String continueToken = getBucketResponse.getNextToken();
getBucketResponse =
@ -246,7 +292,6 @@ public class TestBucketGet {
Assert.assertTrue(getBucketResponse.isTruncated());
Assert.assertTrue(getBucketResponse.getCommonPrefixes().size() == 2);
//3rd time
continueToken = getBucketResponse.getNextToken();
getBucketResponse =

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.s3.util;
import org.apache.hadoop.ozone.s3.exception.OS3Exception;
import org.junit.Assert;
import org.junit.Test;
/**
* Test encode/decode of the continue token.
*/
public class TestContinueToken {
@Test
public void encodeDecode() throws OS3Exception {
ContinueToken ct = new ContinueToken("key1", "dir1");
ContinueToken parsedToken =
ContinueToken.decodeFromString(ct.encodeToString());
Assert.assertEquals(ct, parsedToken);
}
@Test
public void encodeDecodeNullDir() throws OS3Exception {
ContinueToken ct = new ContinueToken("key1", null);
ContinueToken parsedToken =
ContinueToken.decodeFromString(ct.encodeToString());
Assert.assertEquals(ct, parsedToken);
}
}

View File

@ -23,9 +23,9 @@ import org.junit.Test;
import static org.junit.Assert.assertEquals;
/**
* Test class to test S3utils.
* Test class to test RangeHeaderParserUtil.
*/
public class TestS3utils {
public class TestRangeHeaderParserUtil {
@Test
public void testRangeHeaderParser() {
@ -34,14 +34,14 @@ public class TestS3utils {
//range is with in file length
rangeHeader = S3utils.parseRangeHeader("bytes=0-8", 10);
rangeHeader = RangeHeaderParserUtil.parseRangeHeader("bytes=0-8", 10);
assertEquals(0, rangeHeader.getStartOffset());
assertEquals(8, rangeHeader.getEndOffset());
assertEquals(false, rangeHeader.isReadFull());
assertEquals(false, rangeHeader.isInValidRange());
//range is with in file length, both start and end offset are same
rangeHeader = S3utils.parseRangeHeader("bytes=0-0", 10);
rangeHeader = RangeHeaderParserUtil.parseRangeHeader("bytes=0-0", 10);
assertEquals(0, rangeHeader.getStartOffset());
assertEquals(0, rangeHeader.getEndOffset());
assertEquals(false, rangeHeader.isReadFull());
@ -49,39 +49,39 @@ public class TestS3utils {
//range is not with in file length, both start and end offset are greater
// than length
rangeHeader = S3utils.parseRangeHeader("bytes=11-10", 10);
rangeHeader = RangeHeaderParserUtil.parseRangeHeader("bytes=11-10", 10);
assertEquals(true, rangeHeader.isInValidRange());
// range is satisfying, one of the range is with in the length. So, read
// full file
rangeHeader = S3utils.parseRangeHeader("bytes=11-8", 10);
rangeHeader = RangeHeaderParserUtil.parseRangeHeader("bytes=11-8", 10);
assertEquals(0, rangeHeader.getStartOffset());
assertEquals(9, rangeHeader.getEndOffset());
assertEquals(true, rangeHeader.isReadFull());
assertEquals(false, rangeHeader.isInValidRange());
// bytes spec is wrong
rangeHeader = S3utils.parseRangeHeader("mb=11-8", 10);
rangeHeader = RangeHeaderParserUtil.parseRangeHeader("mb=11-8", 10);
assertEquals(0, rangeHeader.getStartOffset());
assertEquals(9, rangeHeader.getEndOffset());
assertEquals(true, rangeHeader.isReadFull());
assertEquals(false, rangeHeader.isInValidRange());
// range specified is invalid
rangeHeader = S3utils.parseRangeHeader("bytes=-11-8", 10);
rangeHeader = RangeHeaderParserUtil.parseRangeHeader("bytes=-11-8", 10);
assertEquals(0, rangeHeader.getStartOffset());
assertEquals(9, rangeHeader.getEndOffset());
assertEquals(true, rangeHeader.isReadFull());
assertEquals(false, rangeHeader.isInValidRange());
//Last n bytes
rangeHeader = S3utils.parseRangeHeader("bytes=-6", 10);
rangeHeader = RangeHeaderParserUtil.parseRangeHeader("bytes=-6", 10);
assertEquals(4, rangeHeader.getStartOffset());
assertEquals(9, rangeHeader.getEndOffset());
assertEquals(false, rangeHeader.isReadFull());
assertEquals(false, rangeHeader.isInValidRange());
rangeHeader = S3utils.parseRangeHeader("bytes=-106", 10);
rangeHeader = RangeHeaderParserUtil.parseRangeHeader("bytes=-106", 10);
assertEquals(0, rangeHeader.getStartOffset());
assertEquals(9, rangeHeader.getEndOffset());
assertEquals(false, rangeHeader.isInValidRange());

View File

@ -2697,6 +2697,13 @@ public class YarnConfiguration extends Configuration {
"org.apache.hadoop.yarn.server.timelineservice.storage" +
".HBaseTimelineReaderImpl";
public static final String TIMELINE_SERVICE_SCHEMA_CREATOR_CLASS =
TIMELINE_SERVICE_PREFIX + "schema-creator.class";
public static final String DEFAULT_TIMELINE_SERVICE_SCHEMA_CREATOR_CLASS =
"org.apache.hadoop.yarn.server.timelineservice.storage" +
".HBaseTimelineSchemaCreator";
/**
* default schema prefix for hbase tables.
*/

View File

@ -350,8 +350,10 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
// record in ATS
LOG.info("Publishing component instance status {} {} ",
event.getContainerId(), containerState);
int exitStatus = failureBeforeLaunch || event.getStatus() == null ?
ContainerExitStatus.INVALID : event.getStatus().getExitStatus();
compInstance.serviceTimelinePublisher.componentInstanceFinished(
event.getContainerId(), event.getStatus().getExitStatus(),
event.getContainerId(), exitStatus,
containerState, containerDiag);
}
@ -366,8 +368,10 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
if (compInstance.timelineServiceEnabled) {
// record in ATS
int exitStatus = failureBeforeLaunch || event.getStatus() == null ?
ContainerExitStatus.INVALID : event.getStatus().getExitStatus();
compInstance.serviceTimelinePublisher.componentInstanceFinished(
event.getContainerId(), event.getStatus().getExitStatus(),
event.getContainerId(), exitStatus,
containerState, containerDiag);
}

View File

@ -539,8 +539,8 @@
This property allows users to set ACLs of their choice instead of using
the default mechanism. For fencing to work, the ACLs should be
carefully set differently on each ResourceManger such that all the
ResourceManagers have shared admin access and the Active ResourceManger
carefully set differently on each ResourceManager such that all the
ResourceManagers have shared admin access and the Active ResourceManager
takes over (exclusively) the create-delete access.
</description>
<name>yarn.resourcemanager.zk-state-store.root-node.acl</name>

View File

@ -437,9 +437,11 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
RMAppAttemptState.FAILED,
EnumSet.of(
RMAppAttemptEventType.LAUNCHED,
RMAppAttemptEventType.LAUNCH_FAILED,
RMAppAttemptEventType.EXPIRE,
RMAppAttemptEventType.KILL,
RMAppAttemptEventType.FAIL,
RMAppAttemptEventType.REGISTERED,
RMAppAttemptEventType.UNREGISTERED,
RMAppAttemptEventType.STATUS_UPDATE,
RMAppAttemptEventType.CONTAINER_ALLOCATED))
@ -1203,10 +1205,16 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
}
// Set the masterContainer
appAttempt.setMasterContainer(amContainerAllocation.getContainers()
.get(0));
Container amContainer = amContainerAllocation.getContainers().get(0);
RMContainerImpl rmMasterContainer = (RMContainerImpl)appAttempt.scheduler
.getRMContainer(appAttempt.getMasterContainer().getId());
.getRMContainer(amContainer.getId());
//while one NM is removed, the scheduler will clean the container,the
//following CONTAINER_FINISHED event will handle the cleaned container.
//so just return RMAppAttemptState.SCHEDULED
if (rmMasterContainer == null) {
return RMAppAttemptState.SCHEDULED;
}
appAttempt.setMasterContainer(amContainer);
rmMasterContainer.setAMContainer(true);
// The node set in NMTokenSecrentManager is used for marking whether the
// NMToken has been issued for this node to the AM.

View File

@ -986,7 +986,7 @@ public class TestRMAppAttemptTransitions {
public void testAttemptAddedAtFinalSaving() {
submitApplicationAttempt();
// SUBNITED->FINAL_SAVING
// SUBMITTED->FINAL_SAVING
applicationAttempt.handle(new RMAppAttemptEvent(applicationAttempt
.getAppAttemptId(), RMAppAttemptEventType.KILL));
assertEquals(RMAppAttemptState.FINAL_SAVING,
@ -999,6 +999,56 @@ public class TestRMAppAttemptTransitions {
applicationAttempt.getAppAttemptState());
}
@Test(timeout = 10000)
public void testAttemptRegisteredAtFailed() {
Container amContainer = allocateApplicationAttempt();
launchApplicationAttempt(amContainer);
//send CONTAINER_FINISHED event
NodeId anyNodeId = NodeId.newInstance("host", 1234);
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus(
amContainer.getId(), ContainerState.COMPLETE, "", 0,
amContainer.getResource()), anyNodeId));
assertEquals(RMAppAttemptState.FINAL_SAVING,
applicationAttempt.getAppAttemptState());
sendAttemptUpdateSavedEvent(applicationAttempt);
assertEquals(RMAppAttemptState.FAILED,
applicationAttempt.getAppAttemptState());
//send REGISTERED event
applicationAttempt.handle(new RMAppAttemptEvent(applicationAttempt
.getAppAttemptId(), RMAppAttemptEventType.REGISTERED));
assertEquals(RMAppAttemptState.FAILED,
applicationAttempt.getAppAttemptState());
}
@Test
public void testAttemptLaunchFailedAtFailed() {
Container amContainer = allocateApplicationAttempt();
launchApplicationAttempt(amContainer);
//send CONTAINER_FINISHED event
NodeId anyNodeId = NodeId.newInstance("host", 1234);
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus(
amContainer.getId(), ContainerState.COMPLETE, "", 0,
amContainer.getResource()), anyNodeId));
assertEquals(RMAppAttemptState.FINAL_SAVING,
applicationAttempt.getAppAttemptState());
sendAttemptUpdateSavedEvent(applicationAttempt);
assertEquals(RMAppAttemptState.FAILED,
applicationAttempt.getAppAttemptState());
//send LAUNCH_FAILED event
applicationAttempt.handle(new RMAppAttemptEvent(applicationAttempt
.getAppAttemptId(), RMAppAttemptEventType.LAUNCH_FAILED));
assertEquals(RMAppAttemptState.FAILED,
applicationAttempt.getAppAttemptState());
}
@Test
public void testAMCrashAtAllocated() {
Container amContainer = allocateApplicationAttempt();
@ -1598,6 +1648,34 @@ public class TestRMAppAttemptTransitions {
assertTrue(found);
}
@Test
public void testContainerRemovedBeforeAllocate() {
scheduleApplicationAttempt();
// Mock the allocation of AM container
Container container = mock(Container.class);
Resource resource = BuilderUtils.newResource(2048, 1);
when(container.getId()).thenReturn(
BuilderUtils.newContainerId(applicationAttempt.getAppAttemptId(), 1));
when(container.getResource()).thenReturn(resource);
Allocation allocation = mock(Allocation.class);
when(allocation.getContainers()).
thenReturn(Collections.singletonList(container));
when(scheduler.allocate(any(ApplicationAttemptId.class), any(List.class),
any(List.class), any(List.class), any(List.class), any(List.class),
any(ContainerUpdates.class))).
thenReturn(allocation);
//container removed, so return null
when(scheduler.getRMContainer(container.getId())).
thenReturn(null);
applicationAttempt.handle(
new RMAppAttemptEvent(applicationAttempt.getAppAttemptId(),
RMAppAttemptEventType.CONTAINER_ALLOCATED));
assertEquals(RMAppAttemptState.SCHEDULED,
applicationAttempt.getAppAttemptState());
}
@SuppressWarnings("deprecation")
@Test

View File

@ -57,7 +57,7 @@ public final class DataGeneratorForTest {
// the coprocessor class is loaded from classpath
conf.set(YarnConfiguration.FLOW_RUN_COPROCESSOR_JAR_HDFS_LOCATION, " ");
// now create all tables
TimelineSchemaCreator.createAllTables(conf, false);
HBaseTimelineSchemaCreator.createAllTables(conf, false);
}
public static void loadApps(HBaseTestingUtility util, long ts)

View File

@ -56,13 +56,13 @@ import org.slf4j.LoggerFactory;
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public final class TimelineSchemaCreator {
private TimelineSchemaCreator() {
public final class HBaseTimelineSchemaCreator implements SchemaCreator {
public HBaseTimelineSchemaCreator() {
}
final static String NAME = TimelineSchemaCreator.class.getSimpleName();
final static String NAME = HBaseTimelineSchemaCreator.class.getSimpleName();
private static final Logger LOG =
LoggerFactory.getLogger(TimelineSchemaCreator.class);
LoggerFactory.getLogger(HBaseTimelineSchemaCreator.class);
private static final String SKIP_EXISTING_TABLE_OPTION_SHORT = "s";
private static final String APP_METRICS_TTL_OPTION_SHORT = "ma";
private static final String SUB_APP_METRICS_TTL_OPTION_SHORT = "msa";
@ -74,7 +74,7 @@ public final class TimelineSchemaCreator {
private static final String HELP_SHORT = "h";
private static final String CREATE_TABLES_SHORT = "c";
public static void main(String[] args) throws Exception {
public void createTimelineSchema(String[] args) throws Exception {
LOG.info("Starting the schema creation");
Configuration hbaseConf =

View File

@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage;
/**
* This interface is for creating Timeline Schema. The backend for Timeline
* Service have to implement this.
*/
public interface SchemaCreator {
void createTimelineSchema(String[] args) throws Exception;
}

View File

@ -0,0 +1,80 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This creates the timeline schema for storing application timeline
* information. Each backend has to implement the {@link SchemaCreator} for
* creating the schema in its backend and should be configured in yarn-site.xml.
*/
public class TimelineSchemaCreator extends Configured implements Tool {
private static final Logger LOG =
LoggerFactory.getLogger(TimelineSchemaCreator.class);
public static void main(String[] args) {
try {
int status = ToolRunner.run(new YarnConfiguration(),
new TimelineSchemaCreator(), args);
System.exit(status);
} catch (Exception e) {
LOG.error("Error while creating Timeline Schema : ", e);
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
return createTimelineSchema(args, conf);
}
@VisibleForTesting
int createTimelineSchema(String[] args, Configuration conf) throws Exception {
String schemaCreatorClassName = conf.get(
YarnConfiguration.TIMELINE_SERVICE_SCHEMA_CREATOR_CLASS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_SCHEMA_CREATOR_CLASS);
LOG.info("Using {} for creating Timeline Service Schema ",
schemaCreatorClassName);
try {
Class<?> schemaCreatorClass = Class.forName(schemaCreatorClassName);
if (SchemaCreator.class.isAssignableFrom(schemaCreatorClass)) {
SchemaCreator schemaCreator = (SchemaCreator) ReflectionUtils
.newInstance(schemaCreatorClass, conf);
schemaCreator.createTimelineSchema(args);
return 0;
} else {
throw new YarnRuntimeException("Class: " + schemaCreatorClassName
+ " not instance of " + SchemaCreator.class.getCanonicalName());
}
} catch (ClassNotFoundException e) {
throw new YarnRuntimeException("Could not instantiate TimelineReader: "
+ schemaCreatorClassName, e);
}
}
}

View File

@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage;
/**
* Dummy Implementation of {@link SchemaCreator} for test.
*/
public class DummyTimelineSchemaCreator implements SchemaCreator {
@Override
public void createTimelineSchema(String[] args) {
}
}

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.Assert;
import org.junit.Test;
/**
* Test cases for {@link TimelineSchemaCreator}.
*/
public class TestTimelineSchemaCreator {
@Test
public void testTimelineSchemaCreation() throws Exception {
Configuration conf = new Configuration();
conf.set(YarnConfiguration.TIMELINE_SERVICE_SCHEMA_CREATOR_CLASS,
"org.apache.hadoop.yarn.server.timelineservice.storage" +
".DummyTimelineSchemaCreator");
TimelineSchemaCreator timelineSchemaCreator = new TimelineSchemaCreator();
Assert.assertEquals(0, timelineSchemaCreator
.createTimelineSchema(new String[]{}, conf));
}
}

View File

@ -13,7 +13,7 @@
"qunit": "1.19.0",
"jquery-ui": "1.11.4",
"moment": "2.12.0",
"moment-timezone": "0.5.0",
"moment-timezone": "0.5.1",
"more-js": "0.8.2",
"bootstrap": "3.3.6",
"d3": "~3.5.6",