HDFS-11796. Ozone: MiniOzoneCluster should set "ozone.handler.type" key correctly. Contributed by Mukul Kumar Singh.

This commit is contained in:
Anu Engineer 2017-06-02 16:24:16 -07:00
parent 5cdd880078
commit 776d2d4de7
4 changed files with 25 additions and 15 deletions

View File

@ -73,18 +73,23 @@ public class KeyManagerImpl implements KeyManager {
throw new KSMException("Bucket not found", throw new KSMException("Bucket not found",
KSMException.ResultCodes.FAILED_BUCKET_NOT_FOUND); KSMException.ResultCodes.FAILED_BUCKET_NOT_FOUND);
} }
// TODO throw exception if key exists, may change to support key
// overwrite in the future
//Check if key already exists.
if(metadataManager.get(keyKey) != null) {
LOG.error("key already exist: {}/{}/{} ", volumeName, bucketName,
keyName);
throw new KSMException("Key already exist",
KSMException.ResultCodes.FAILED_KEY_ALREADY_EXISTS);
}
// TODO: Garbage collect deleted blocks due to overwrite of a key.
// FIXME: BUG: Please see HDFS-11922.
// If user overwrites a key, then we are letting it pass without
// corresponding process.
// In reality we need to garbage collect those blocks by telling SCM to
// clean up those blocks when it can. Right now making this change
// allows us to pass tests that expect ozone can overwrite a key.
// When we talk to SCM make sure that we ask for at least a byte in the
// block. This way even if the call is for a zero length key, we back it
// with a actual SCM block.
// TODO : Review this decision later. We can get away with only a
// metadata entry in case of 0 length key.
AllocatedBlock allocatedBlock = AllocatedBlock allocatedBlock =
scmBlockClient.allocateBlock(args.getDataSize()); scmBlockClient.allocateBlock(Math.max(args.getDataSize(), 1));
KsmKeyInfo keyBlock = new KsmKeyInfo.Builder() KsmKeyInfo keyBlock = new KsmKeyInfo.Builder()
.setVolumeName(args.getVolumeName()) .setVolumeName(args.getVolumeName())
.setBucketName(args.getBucketName()) .setBucketName(args.getBucketName())
@ -98,7 +103,7 @@ public class KeyManagerImpl implements KeyManager {
LOG.debug("Key {} allocated in volume {} bucket {}", LOG.debug("Key {} allocated in volume {} bucket {}",
keyName, volumeName, bucketName); keyName, volumeName, bucketName);
return keyBlock; return keyBlock;
} catch (DBException ex) { } catch (Exception ex) {
LOG.error("Key allocation failed for volume:{} bucket:{} key:{}", LOG.error("Key allocation failed for volume:{} bucket:{} key:{}",
volumeName, bucketName, keyName, ex); volumeName, bucketName, keyName, ex);
throw new KSMException(ex.getMessage(), throw new KSMException(ex.getMessage(),

View File

@ -423,6 +423,8 @@ public final class MiniOzoneCluster extends MiniDFSCluster
if (!ozoneHandlerType.isPresent()) { if (!ozoneHandlerType.isPresent()) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"The Ozone handler type must be specified."); "The Ozone handler type must be specified.");
} else {
conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY, ozoneHandlerType.get());
} }
} }

View File

@ -360,12 +360,14 @@ public class TestKeySpaceManager {
try (OutputStream stream = storageHandler.newKeyWriter(keyArgs)) { try (OutputStream stream = storageHandler.newKeyWriter(keyArgs)) {
stream.write(dataString.getBytes()); stream.write(dataString.getBytes());
} }
// try to put the same keyArg, should raise KEY_ALREADY_EXISTS exception
exception.expect(IOException.class); // We allow the key overwrite to be successful. Please note : Till HDFS-11922
exception.expectMessage("KEY_ALREADY_EXISTS"); // is fixed this causes a data block leak on the data node side. That is
// this overwrite only overwrites the keys on KSM. We need to garbage
// collect those blocks from datanode.
KeyArgs keyArgs2 = new KeyArgs(volumeName, bucketName, keyName, userArgs); KeyArgs keyArgs2 = new KeyArgs(volumeName, bucketName, keyName, userArgs);
storageHandler.newKeyWriter(keyArgs2); storageHandler.newKeyWriter(keyArgs2);
Assert.assertEquals(1 + numKeyAllocateFails, Assert.assertEquals(numKeyAllocateFails,
ksmMetrics.getNumKeyAllocateFails()); ksmMetrics.getNumKeyAllocateFails());
} }

View File

@ -62,6 +62,7 @@ public class TestOzoneRestWithMiniCluster {
conf = new OzoneConfiguration(); conf = new OzoneConfiguration();
cluster = new MiniOzoneCluster.Builder(conf).numDataNodes(1) cluster = new MiniOzoneCluster.Builder(conf).numDataNodes(1)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build(); .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
cluster.waitOzoneReady();
ozoneClient = cluster.createOzoneClient(); ozoneClient = cluster.createOzoneClient();
} }