HDFS-11550. Ozone: Add a check to prevent removing a container that has keys in it. Contributed by Weiwei Yang.

This commit is contained in:
Anu Engineer 2017-03-27 08:38:57 -07:00 committed by Owen O'Malley
parent 23044c1db8
commit 860a4e0044
7 changed files with 107 additions and 38 deletions

View File

@ -124,6 +124,7 @@ enum Result {
PUT_SMALL_FILE_ERROR = 20;
GET_SMALL_FILE_ERROR = 21;
CLOSED_CONTAINER_IO = 22;
ERROR_CONTAINER_NOT_EMPTY = 23;
}
message ContainerCommandRequestProto {

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.container.common.helpers;
import com.google.common.base.Preconditions;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
@ -339,15 +340,20 @@ public final class ContainerUtils {
* @param containerData - Data of the container to remove.
* @throws IOException
*/
public static void removeContainer(ContainerData containerData) throws
IOException {
public static void removeContainer(ContainerData containerData,
Configuration conf) throws IOException {
Preconditions.checkNotNull(containerData);
// TODO : Check if there are any keys. This needs to be done
// by calling into key layer code, hence this is a TODO for now.
Path dbPath = Paths.get(containerData.getDBPath());
LevelDBStore db = KeyUtils.getDB(containerData, conf);
if(!db.isEmpty()) {
throw new StorageContainerException(
"Container cannot be deleted because it is not empty.",
ContainerProtos.Result.ERROR_CONTAINER_NOT_EMPTY);
}
// Close the DB connection and remove the DB handler from cache
KeyUtils.removeDB(containerData, conf);
// Delete the DB File.
FileUtils.forceDelete(dbPath.toFile());
dbPath = dbPath.getParent();

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.container.common.helpers;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.impl.KeyManagerImpl;
@ -47,34 +48,27 @@ public final class KeyUtils {
}
/**
* Returns a file handle to LevelDB.
* Get a DB handler for a given container.
* If the handler doesn't exist in cache yet, first create one and
* add into cache. This function is called with containerManager
* ReadLock held.
*
* @param dbPath - DbPath.
* @return LevelDB
*/
public static LevelDBStore getDB(String dbPath) throws IOException {
Preconditions.checkNotNull(dbPath);
Preconditions.checkState(!dbPath.isEmpty());
return new LevelDBStore(new File(dbPath), false);
}
/**
* This function is called with containerManager ReadLock held.
*
* @param container - container.
* @param cache - cache
* @param container container.
* @param conf configuration.
* @return LevelDB handle.
* @throws StorageContainerException
*/
public static LevelDBStore getDB(ContainerData container,
ContainerCache cache)
throws StorageContainerException {
Configuration conf) throws StorageContainerException {
Preconditions.checkNotNull(container);
ContainerCache cache = ContainerCache.getInstance(conf);
Preconditions.checkNotNull(cache);
try {
LevelDBStore db = cache.getDB(container.getContainerName());
if (db == null) {
db = getDB(container.getDBPath());
db = new LevelDBStore(
new File(container.getDBPath()),
false);
cache.putDB(container.getContainerName(), db);
}
return db;
@ -85,6 +79,19 @@ public final class KeyUtils {
}
}
/**
* Remove a DB handler from cache.
*
* @param container - Container data.
* @param conf - Configuration.
*/
public static void removeDB(ContainerData container,
Configuration conf) {
Preconditions.checkNotNull(container);
ContainerCache cache = ContainerCache.getInstance(conf);
Preconditions.checkNotNull(cache);
cache.removeDB(container.getContainerName());
}
/**
* Shutdown all DB Handles.
*

View File

@ -21,7 +21,6 @@ package org.apache.hadoop.ozone.container.common.impl;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
@ -52,7 +51,7 @@ public class KeyManagerImpl implements KeyManager {
private static final float LOAD_FACTOR = 0.75f;
private final ContainerManager containerManager;
private final ContainerCache containerCache;
private final Configuration conf;
/**
* Constructs a key Manager.
@ -63,10 +62,8 @@ public class KeyManagerImpl implements KeyManager {
Preconditions.checkNotNull(containerManager, "Container manager cannot be" +
" null");
Preconditions.checkNotNull(conf, "Config cannot be null");
int cacheSize = conf.getInt(OzoneConfigKeys.OZONE_KEY_CACHE,
OzoneConfigKeys.OZONE_KEY_CACHE_DEFAULT);
this.containerManager = containerManager;
containerCache = new ContainerCache(cacheSize, LOAD_FACTOR, true);
this.conf = conf;
}
/**
@ -84,7 +81,7 @@ public class KeyManagerImpl implements KeyManager {
"Container name cannot be null");
ContainerData cData = containerManager.readContainer(
pipeline.getContainerName());
LevelDBStore db = KeyUtils.getDB(cData, containerCache);
LevelDBStore db = KeyUtils.getDB(cData, conf);
// This is a post condition that acts as a hint to the user.
// Should never fail.
@ -109,7 +106,7 @@ public class KeyManagerImpl implements KeyManager {
"Container name cannot be null");
ContainerData cData = containerManager.readContainer(data
.getContainerName());
LevelDBStore db = KeyUtils.getDB(cData, containerCache);
LevelDBStore db = KeyUtils.getDB(cData, conf);
// This is a post condition that acts as a hint to the user.
// Should never fail.
@ -143,7 +140,7 @@ public class KeyManagerImpl implements KeyManager {
"Container name cannot be null");
ContainerData cData = containerManager.readContainer(pipeline
.getContainerName());
LevelDBStore db = KeyUtils.getDB(cData, containerCache);
LevelDBStore db = KeyUtils.getDB(cData, conf);
// This is a post condition that acts as a hint to the user.
// Should never fail.
@ -181,6 +178,6 @@ public class KeyManagerImpl implements KeyManager {
public void shutdown() {
Preconditions.checkState(this.containerManager.hasWriteLock(), "asserts " +
"that we are holding the container manager lock when shutting down.");
KeyUtils.shutdownCache(containerCache);
KeyUtils.shutdownCache(ContainerCache.getInstance(conf));
}
}

View File

@ -22,6 +22,8 @@ import com.google.common.base.Preconditions;
import org.apache.commons.collections.map.LRUMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.utils.LevelDBStore;
import java.io.IOException;
@ -31,18 +33,35 @@ import java.util.concurrent.locks.ReentrantLock;
/**
* container cache is a LRUMap that maintains the DB handles.
*/
public class ContainerCache extends LRUMap {
public final class ContainerCache extends LRUMap {
static final Log LOG = LogFactory.getLog(ContainerCache.class);
private final Lock lock = new ReentrantLock();
private static ContainerCache cache;
private static final float LOAD_FACTOR = 0.75f;
/**
* Constructs a cache that holds DBHandle references.
*/
public ContainerCache(int maxSize, float loadFactor, boolean
private ContainerCache(int maxSize, float loadFactor, boolean
scanUntilRemovable) {
super(maxSize, loadFactor, scanUntilRemovable);
}
/**
* Return a singleton instance of {@link ContainerCache}
* that holds the DB handlers.
*
* @param conf - Configuration.
* @return A instance of {@link ContainerCache}.
*/
public synchronized static ContainerCache getInstance(Configuration conf) {
if (cache == null) {
int cacheSize = conf.getInt(OzoneConfigKeys.OZONE_KEY_CACHE,
OzoneConfigKeys.OZONE_KEY_CACHE_DEFAULT);
cache = new ContainerCache(cacheSize, LOAD_FACTOR, true);
}
return cache;
}
/**
* {@inheritDoc}
*/
@ -77,6 +96,30 @@ public class ContainerCache extends LRUMap {
}
}
/**
* Remove a DB handler from cache.
*
* @param containerName - Name of the container.
*/
public void removeDB(String containerName) {
Preconditions.checkNotNull(containerName);
Preconditions.checkState(!containerName.isEmpty());
lock.lock();
try {
LevelDBStore db = this.getDB(containerName);
if (db != null) {
try {
db.close();
} catch (IOException e) {
LOG.warn("There is some issue to stop an unused DB handler.", e);
}
}
this.remove(containerName);
} finally {
lock.unlock();
}
}
/**
* Add a new DB to the cache.
*

View File

@ -120,7 +120,7 @@ public class LevelDBStore {
DBIterator iter = db.iterator();
try {
iter.seekToFirst();
return iter.hasNext();
return !iter.hasNext();
} finally {
iter.close();
}

View File

@ -210,7 +210,6 @@ public class TestContainerPersistence {
containerManager.createContainer(createSingleNodePipeline(containerName2),
data);
Assert.assertTrue(containerManager.getContainerMap()
.containsKey(containerName1));
Assert.assertTrue(containerManager.getContainerMap()
@ -236,6 +235,22 @@ public class TestContainerPersistence {
Assert.assertTrue(containerManager.getContainerMap()
.containsKey(containerName2));
// Add some key to a container and then delete.
// Delete should fail because the container is no longer empty.
KeyData someKey = new KeyData(containerName1, "someKey");
someKey.setChunks(new LinkedList<ContainerProtos.ChunkInfo>());
keyManager.putKey(
createSingleNodePipeline(containerName1),
someKey);
exception.expect(StorageContainerException.class);
exception.expectMessage(
"Container cannot be deleted because it is not empty.");
containerManager.deleteContainer(
createSingleNodePipeline(containerName1),
containerName1);
Assert.assertTrue(containerManager.getContainerMap()
.containsKey(containerName1));
}
/**