HDFS-6720. Remove KeyProvider in EncryptionZoneManager. (wang)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/fs-encryption@1612632 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Wang 2014-07-22 17:57:06 +00:00
parent b52b80d7bd
commit 69b75fca7a
18 changed files with 99 additions and 114 deletions

View File

@ -54,6 +54,8 @@ fs-encryption (Unreleased)
HDFS-6718. Remove EncryptionZoneManager lock. (wang)
HDFS-6720. Remove KeyProvider in EncryptionZoneManager. (wang)
OPTIMIZATIONS
BUG FIXES

View File

@ -2807,11 +2807,11 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
}
}
public void createEncryptionZone(String src, String keyId)
public void createEncryptionZone(String src, String keyName)
throws IOException {
checkOpen();
try {
namenode.createEncryptionZone(src, keyId);
namenode.createEncryptionZone(src, keyName);
} catch (RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
SafeModeException.class,

View File

@ -1799,9 +1799,9 @@ public class DistributedFileSystem extends FileSystem {
}
/* HDFS only */
public void createEncryptionZone(Path path, String keyId)
public void createEncryptionZone(Path path, String keyName)
throws IOException {
dfs.createEncryptionZone(getPathName(path), keyId);
dfs.createEncryptionZone(getPathName(path), keyName);
}
/* HDFS only */

View File

@ -231,21 +231,16 @@ public class HdfsAdmin {
}
/**
* Create an encryption zone rooted at path using the optional encryption key
* id. An encryption zone is a portion of the HDFS file system hierarchy in
* which all files are encrypted with the same key, but possibly different
* key versions per file.
* <p/>
* Path must refer to an empty, existing directory. Otherwise an IOException
* will be thrown. keyId specifies the id of an encryption key in the
* KeyProvider that the Namenode has been configured to use. If keyId is
* null, then a key is generated in the KeyProvider using {@link
* java.util.UUID} to generate a key id.
* Create an encryption zone rooted at an empty existing directory. An
* encryption zone has an associated encryption key used when reading and
* writing files within the zone. An existing key can be specified,
* else a new key will be generated for the encryption zone.
*
* @param path The path of the root of the encryption zone.
* @param path The path of the root of the encryption zone. Must refer to
* an empty, existing directory.
*
* @param keyId An optional keyId in the KeyProvider. If null, then
* a key is generated.
* @param keyName Optional name of key available at the KeyProvider. If null,
* then a key is generated.
*
* @throws IOException if there was a general IO exception
*
@ -253,18 +248,15 @@ public class HdfsAdmin {
*
* @throws FileNotFoundException if the path does not exist
*/
public void createEncryptionZone(Path path, String keyId)
public void createEncryptionZone(Path path, String keyName)
throws IOException, AccessControlException, FileNotFoundException {
dfs.createEncryptionZone(path, keyId);
dfs.createEncryptionZone(path, keyName);
}
/**
* Return a list of all {@EncryptionZone}s in the HDFS hierarchy which are
* visible to the caller. If the caller is the HDFS admin, then the returned
* EncryptionZone instances will have the key id field filled in. If the
* caller is not the HDFS admin, then the EncryptionZone instances will only
* have the path field filled in and only those zones that are visible to the
* user are returned.
* Return a list of all {@link EncryptionZone}s in the HDFS hierarchy which
* are visible to the caller. If the caller is an HDFS superuser,
* then the key name of each encryption zone will also be provided.
*
* @throws IOException if there was a general IO exception
*

View File

@ -1263,7 +1263,7 @@ public interface ClientProtocol {
* Create an encryption zone
*/
@AtMostOnce
public void createEncryptionZone(String src, String keyId)
public void createEncryptionZone(String src, String keyName)
throws IOException;
/**

View File

@ -24,32 +24,32 @@ import org.apache.hadoop.classification.InterfaceStability;
/**
* A simple class for representing an encryption zone. Presently an encryption
* zone only has a path (the root of the encryption zone) and a key id.
* zone only has a path (the root of the encryption zone) and a key name.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class EncryptionZone {
private final String path;
private final String keyId;
private final String keyName;
public EncryptionZone(String path, String keyId) {
public EncryptionZone(String path, String keyName) {
this.path = path;
this.keyId = keyId;
this.keyName = keyName;
}
public String getPath() {
return path;
}
public String getKeyId() {
return keyId;
public String getKeyName() {
return keyName;
}
@Override
public int hashCode() {
return new HashCodeBuilder(13, 31).
append(path).append(keyId).
append(path).append(keyName).
toHashCode();
}
@ -68,12 +68,12 @@ public class EncryptionZone {
EncryptionZone rhs = (EncryptionZone) obj;
return new EqualsBuilder().
append(path, rhs.path).
append(keyId, rhs.keyId).
append(keyName, rhs.keyName).
isEquals();
}
@Override
public String toString() {
return "EncryptionZone [path=" + path + ", keyId=" + keyId + "]";
return "EncryptionZone [path=" + path + ", keyName=" + keyName + "]";
}
}

View File

@ -1287,7 +1287,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
RpcController controller, CreateEncryptionZoneRequestProto req)
throws ServiceException {
try {
server.createEncryptionZone(req.getSrc(), req.getKeyId());
server.createEncryptionZone(req.getSrc(), req.getKeyName());
return CreateEncryptionZoneResponseProto.newBuilder().build();
} catch (IOException e) {
throw new ServiceException(e);

View File

@ -1284,13 +1284,13 @@ public class ClientNamenodeProtocolTranslatorPB implements
}
@Override
public void createEncryptionZone(String src, String keyId)
public void createEncryptionZone(String src, String keyName)
throws IOException {
final CreateEncryptionZoneRequestProto.Builder builder =
CreateEncryptionZoneRequestProto.newBuilder();
builder.setSrc(src);
if (keyId != null && !keyId.isEmpty()) {
builder.setKeyId(keyId);
if (keyName != null && !keyName.isEmpty()) {
builder.setKeyName(keyName);
}
CreateEncryptionZoneRequestProto req = builder.build();
try {

View File

@ -2209,7 +2209,7 @@ public class PBHelper {
final EncryptionZoneProto.Builder builder =
EncryptionZoneProto.newBuilder();
builder.setPath(a.getPath());
builder.setKeyId(a.getKeyId());
builder.setKeyName(a.getKeyName());
ret.add(builder.build());
}
return ret;
@ -2221,7 +2221,7 @@ public class PBHelper {
Lists.newArrayListWithCapacity(ezs.size());
for (EncryptionZoneProto a : ezs) {
final EncryptionZone ez =
new EncryptionZone(a.getPath(), a.getKeyId());
new EncryptionZone(a.getPath(), a.getKeyName());
ret.add(ez);
}
return ret;

View File

@ -5,11 +5,9 @@ import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.XAttrSetFlag;
@ -20,7 +18,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants
.CRYPTO_XATTR_ENCRYPTION_ZONE;
@ -62,16 +59,14 @@ public class EncryptionZoneManager {
private final Map<Long, EncryptionZoneInt> encryptionZones;
private final FSDirectory dir;
private final KeyProvider provider;
/**
* Construct a new EncryptionZoneManager.
*
* @param dir Enclosing FSDirectory
*/
public EncryptionZoneManager(FSDirectory dir, KeyProvider provider) {
public EncryptionZoneManager(FSDirectory dir) {
this.dir = dir;
this.provider = provider;
encryptionZones = new HashMap<Long, EncryptionZoneInt>();
}
@ -81,11 +76,11 @@ public class EncryptionZoneManager {
* Called while holding the FSDirectory lock.
*
* @param inodeId of the encryption zone
* @param keyId encryption zone key id
* @param keyName encryption zone key name
*/
void addEncryptionZone(Long inodeId, String keyId) {
void addEncryptionZone(Long inodeId, String keyName) {
assert dir.hasWriteLock();
final EncryptionZoneInt ez = new EncryptionZoneInt(inodeId, keyId);
final EncryptionZoneInt ez = new EncryptionZoneInt(inodeId, keyName);
encryptionZones.put(inodeId, ez);
}
@ -209,7 +204,7 @@ public class EncryptionZoneManager {
* <p/>
* Called while holding the FSDirectory lock.
*/
XAttr createEncryptionZone(String src, String keyId, KeyVersion keyVersion)
XAttr createEncryptionZone(String src, String keyName)
throws IOException {
assert dir.hasWriteLock();
if (dir.isNonEmptyDirectory(src)) {
@ -224,17 +219,16 @@ public class EncryptionZoneManager {
"encryption zone. (" + getFullPathName(ezi) + ")");
}
final XAttr keyIdXAttr = XAttrHelper
.buildXAttr(CRYPTO_XATTR_ENCRYPTION_ZONE, keyId.getBytes());
final XAttr ezXAttr = XAttrHelper
.buildXAttr(CRYPTO_XATTR_ENCRYPTION_ZONE, keyName.getBytes());
final List<XAttr> xattrs = Lists.newArrayListWithCapacity(1);
xattrs.add(keyIdXAttr);
xattrs.add(ezXAttr);
// updating the xattr will call addEncryptionZone,
// done this way to handle edit log loading
dir.unprotectedSetXAttrs(src, xattrs, EnumSet.of(XAttrSetFlag.CREATE));
// Re-get the new encryption zone add the latest key version
ezi = getEncryptionZoneForPath(srcIIP);
return keyIdXAttr;
return ezXAttr;
}
/**

View File

@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_ENCRYPTION_ZONE;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_FILE_ENCRYPTION_INFO;
import static org.apache.hadoop.util.Time.now;
@ -36,7 +35,6 @@ import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileEncryptionInfo;
@ -227,7 +225,7 @@ public class FSDirectory implements Closeable {
nameCache = new NameCache<ByteArray>(threshold);
namesystem = ns;
ezManager = new EncryptionZoneManager(this, ns.getProvider());
ezManager = new EncryptionZoneManager(this);
}
private FSNamesystem getFSNamesystem() {
@ -2632,11 +2630,11 @@ public class FSDirectory implements Closeable {
}
}
XAttr createEncryptionZone(String src, String keyId, KeyVersion keyVersion)
XAttr createEncryptionZone(String src, String keyName)
throws IOException {
writeLock();
try {
return ezManager.createEncryptionZone(src, keyId, keyVersion);
return ezManager.createEncryptionZone(src, keyName);
} finally {
writeUnlock();
}

View File

@ -8421,13 +8421,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
/**
* Create an encryption zone on directory src either using keyIdArg if
* supplied or generating a keyId if it's null.
* Create an encryption zone on directory src. If provided,
* will use an existing key, else will generate a new key.
*
* @param src the path of a directory which will be the root of the
* encryption zone. The directory must be empty.
*
* @param keyIdArg an optional keyId of a key in the configured
* @param keyNameArg an optional name of a key in the configured
* KeyProvider. If this is null, then a a new key is generated.
*
* @throws AccessControlException if the caller is not the superuser.
@ -8436,7 +8436,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
*
* @throws SafeModeException if the Namenode is in safe mode.
*/
void createEncryptionZone(final String src, String keyIdArg)
void createEncryptionZone(final String src, String keyNameArg)
throws IOException, UnresolvedLinkException,
SafeModeException, AccessControlException {
final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
@ -8445,16 +8445,15 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
boolean createdKey = false;
String keyId = keyIdArg;
String keyName = keyNameArg;
boolean success = false;
try {
KeyVersion keyVersion;
if (keyId == null || keyId.isEmpty()) {
keyId = UUID.randomUUID().toString();
keyVersion = createNewKey(keyId, src);
if (keyName == null || keyName.isEmpty()) {
keyName = UUID.randomUUID().toString();
createNewKey(keyName, src);
createdKey = true;
} else {
keyVersion = provider.getCurrentKey(keyId);
KeyVersion keyVersion = provider.getCurrentKey(keyName);
if (keyVersion == null) {
/*
* It would be nice if we threw something more specific than
@ -8464,10 +8463,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
* update this to match it, or better yet, just rethrow the
* KeyProvider's exception.
*/
throw new IOException("Key " + keyId + " doesn't exist.");
throw new IOException("Key " + keyName + " doesn't exist.");
}
}
createEncryptionZoneInt(src, keyId, keyVersion, cacheEntry != null);
createEncryptionZoneInt(src, keyName, cacheEntry != null);
success = true;
} catch (AccessControlException e) {
logAuditEvent(false, "createEncryptionZone", src);
@ -8476,14 +8475,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
RetryCache.setState(cacheEntry, success);
if (!success && createdKey) {
/* Unwind key creation. */
provider.deleteKey(keyId);
provider.deleteKey(keyName);
}
}
}
private void createEncryptionZoneInt(final String srcArg, String keyId,
final KeyVersion keyVersion, final boolean logRetryCache) throws
IOException {
private void createEncryptionZoneInt(final String srcArg, String keyName,
final boolean logRetryCache) throws IOException {
String src = srcArg;
HdfsFileStatus resultingStat = null;
checkSuperuserPrivilege();
@ -8497,9 +8495,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
checkNameNodeSafeMode("Cannot create encryption zone on " + src);
src = FSDirectory.resolvePath(src, pathComponents, dir);
final XAttr keyIdXAttr = dir.createEncryptionZone(src, keyId, keyVersion);
final XAttr ezXAttr = dir.createEncryptionZone(src, keyName);
List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
xAttrs.add(keyIdXAttr);
xAttrs.add(ezXAttr);
getEditLog().logSetXAttrs(src, xAttrs, logRetryCache);
resultingStat = getAuditFileInfo(src, false);
} finally {
@ -8512,14 +8510,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
/**
* Create a new key on the KeyProvider for an encryption zone.
*
* @param keyIdArg id of the key
* @param keyNameArg name of the key
* @param src path of the encryption zone.
* @return KeyVersion of the created key
* @throws IOException
*/
private KeyVersion createNewKey(String keyIdArg, String src)
private KeyVersion createNewKey(String keyNameArg, String src)
throws IOException {
Preconditions.checkNotNull(keyIdArg);
Preconditions.checkNotNull(keyNameArg);
Preconditions.checkNotNull(src);
final StringBuilder sb = new StringBuilder("hdfs://");
if (nameserviceId != null) {
@ -8529,14 +8527,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
if (!src.endsWith("/")) {
sb.append('/');
}
sb.append(keyIdArg);
final String keyId = sb.toString();
providerOptions.setDescription(keyId);
sb.append(keyNameArg);
final String keyName = sb.toString();
providerOptions.setDescription(keyName);
providerOptions.setBitLength(codec.getCipherSuite()
.getAlgorithmBlockSize()*8);
KeyVersion version = null;
try {
version = provider.createKey(keyIdArg, providerOptions);
version = provider.createKey(keyNameArg, providerOptions);
} catch (NoSuchAlgorithmException e) {
throw new IOException(e);
}

View File

@ -1413,9 +1413,9 @@ class NameNodeRpcServer implements NamenodeProtocols {
}
@Override
public void createEncryptionZone(String src, String keyId)
public void createEncryptionZone(String src, String keyName)
throws IOException {
namesystem.createEncryptionZone(src, keyId);
namesystem.createEncryptionZone(src, keyName);
}
@Override

View File

@ -124,7 +124,7 @@ public class CryptoAdmin extends Configured implements Tool {
@Override
public String getShortUsage() {
return "[" + getName() + " [-keyId <keyId>] -path <path> " + "]\n";
return "[" + getName() + " [-keyName <keyName>] -path <path> " + "]\n";
}
@Override
@ -132,7 +132,8 @@ public class CryptoAdmin extends Configured implements Tool {
final TableListing listing = getOptionDescriptionListing();
listing.addRow("<path>", "The path of the encryption zone to create. " +
"It must be an empty directory.");
listing.addRow("<keyId>", "The keyId of the new encryption zone.");
listing.addRow("<keyName>", "Name of the key to use for the " +
"encryption zone. A new key will be generated if unspecified.");
return getShortUsage() + "\n" +
"Create a new encryption zone.\n\n" +
listing.toString();
@ -146,8 +147,8 @@ public class CryptoAdmin extends Configured implements Tool {
return 1;
}
final String keyId =
StringUtils.popOptionWithArgument("-keyId", args);
final String keyName =
StringUtils.popOptionWithArgument("-keyName", args);
if (!args.isEmpty()) {
System.err.println("Can't understand argument: " + args.get(0));
@ -156,7 +157,7 @@ public class CryptoAdmin extends Configured implements Tool {
final DistributedFileSystem dfs = getDFS(conf);
try {
dfs.createEncryptionZone(new Path(path), keyId);
dfs.createEncryptionZone(new Path(path), keyName);
System.out.println("Added encryption zone " + path);
} catch (IOException e) {
System.err.println(prettifyException(e));
@ -198,7 +199,7 @@ public class CryptoAdmin extends Configured implements Tool {
.wrapWidth(MAX_LINE_WIDTH).hideHeaders().build();
final List<EncryptionZone> ezs = dfs.listEncryptionZones();
for (EncryptionZone ez : ezs) {
listing.addRow(ez.getPath(), ez.getKeyId());
listing.addRow(ez.getPath(), ez.getKeyName());
}
System.out.println(listing.toString());
} catch (IOException e) {

View File

@ -35,7 +35,7 @@ import "hdfs.proto";
message CreateEncryptionZoneRequestProto {
required string src = 1;
optional string keyId = 2;
optional string keyName = 2;
}
message CreateEncryptionZoneResponseProto {
@ -46,7 +46,7 @@ message ListEncryptionZonesRequestProto {
message EncryptionZoneProto {
required string path = 1;
required string keyId = 2;
required string keyName = 2;
}
message ListEncryptionZonesResponseProto {

View File

@ -93,12 +93,12 @@ public class TestCryptoCLI extends CLITestHelperDFS {
}
/* Helper function to create a key in the Key Provider. */
private void createAKey(String keyId, Configuration conf)
private void createAKey(String keyName, Configuration conf)
throws NoSuchAlgorithmException, IOException {
final KeyProvider provider =
dfsCluster.getNameNode().getNamesystem().getProvider();
final KeyProvider.Options options = KeyProvider.options(conf);
provider.createKey(keyId, options);
provider.createKey(keyName, options);
provider.flush();
}

View File

@ -105,18 +105,18 @@ public class TestEncryptionZones {
}
/**
* Checks that an encryption zone with the specified keyId and path (if not
* Checks that an encryption zone with the specified keyName and path (if not
* null) is present.
*
* @throws IOException if a matching zone could not be found
*/
public void assertZonePresent(String keyId, String path) throws IOException {
public void assertZonePresent(String keyName, String path) throws IOException {
final List<EncryptionZone> zones = dfsAdmin.listEncryptionZones();
boolean match = false;
for (EncryptionZone zone : zones) {
boolean matchKey = (keyId == null);
boolean matchKey = (keyName == null);
boolean matchPath = (path == null);
if (keyId != null && zone.getKeyId().equals(keyId)) {
if (keyName != null && zone.getKeyName().equals(keyName)) {
matchKey = true;
}
if (path != null && zone.getPath().equals(path)) {
@ -127,7 +127,7 @@ public class TestEncryptionZones {
break;
}
}
assertTrue("Did not find expected encryption zone with keyId " + keyId +
assertTrue("Did not find expected encryption zone with keyName " + keyName +
" path " + path, match
);
}
@ -135,11 +135,11 @@ public class TestEncryptionZones {
/**
* Helper function to create a key in the Key Provider.
*/
private void createKey(String keyId)
private void createKey(String keyName)
throws NoSuchAlgorithmException, IOException {
KeyProvider provider = cluster.getNameNode().getNamesystem().getProvider();
final KeyProvider.Options options = KeyProvider.options(conf);
provider.createKey(keyId, options);
provider.createKey(keyName, options);
provider.flush();
}
@ -204,9 +204,9 @@ public class TestEncryptionZones {
/* Test failure of creating an EZ passing a key that doesn't exist. */
final Path zone2 = new Path("/zone2");
fsWrapper.mkdir(zone2, FsPermission.getDirDefault(), false);
final String myKeyId = "mykeyid";
final String myKeyName = "mykeyname";
try {
dfsAdmin.createEncryptionZone(zone2, myKeyId);
dfsAdmin.createEncryptionZone(zone2, myKeyName);
fail("expected key doesn't exist");
} catch (IOException e) {
assertExceptionContains("doesn't exist.", e);
@ -214,10 +214,10 @@ public class TestEncryptionZones {
assertNumZones(1);
/* Test success of creating an EZ when they key exists. */
createKey(myKeyId);
dfsAdmin.createEncryptionZone(zone2, myKeyId);
createKey(myKeyName);
dfsAdmin.createEncryptionZone(zone2, myKeyName);
assertNumZones(++numZones);
assertZonePresent(myKeyId, zone2.toString());
assertZonePresent(myKeyName, zone2.toString());
/* Test failure of create encryption zones as a non super user. */
final UserGroupInformation user = UserGroupInformation.
@ -345,8 +345,8 @@ public class TestEncryptionZones {
// Roll the key of the encryption zone
List<EncryptionZone> zones = dfsAdmin.listEncryptionZones();
assertEquals("Expected 1 EZ", 1, zones.size());
String keyId = zones.get(0).getKeyId();
cluster.getNamesystem().getProvider().rollNewVersion(keyId);
String keyName = zones.get(0).getKeyName();
cluster.getNamesystem().getProvider().rollNewVersion(keyName);
// Read them back in and compare byte-by-byte
validateFiles(baseFile, encFile1, len);
// Write a new enc file and validate

View File

@ -145,7 +145,7 @@
<test-commands>
<command>-fs NAMENODE -mkdir /foo</command>
<command>-fs NAMENODE -ls /</command>-
<crypto-admin-command>-createZone -path /foo -keyId doesntexist</crypto-admin-command>
<crypto-admin-command>-createZone -path /foo -keyName doesntexist</crypto-admin-command>
</test-commands>
<cleanup-commands>
<command>-fs NAMENODE -rmdir /foo</command>
@ -163,7 +163,7 @@
<test-commands>
<command>-fs NAMENODE -mkdir /foo</command>
<command>-fs NAMENODE -ls /</command>-
<crypto-admin-command>-createZone -path /foo -keyId mykey</crypto-admin-command>
<crypto-admin-command>-createZone -path /foo -keyName mykey</crypto-admin-command>
</test-commands>
<cleanup-commands>
<command>-fs NAMENODE -rmdir /foo</command>