HDFS-10250. Ozone: Add key Persistence. Contributed by Anu Engineer.
This commit is contained in:
parent
c6fd5ea3f0
commit
0addb1033e
|
@ -54,6 +54,10 @@ public final class OzoneConfigKeys {
|
|||
public static final String DFS_OZONE_METADATA_DIRS =
|
||||
"dfs.ozone.metadata.dirs";
|
||||
|
||||
public static final String DFS_OZONE_KEY_CACHE = "dfs.ozone.key.cache.size";
|
||||
public static final int DFS_OZONE_KEY_CACHE_DEFAULT = 1024;
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* There is no need to instantiate this class.
|
||||
|
|
|
@ -216,7 +216,7 @@ public final class ChunkUtils {
|
|||
* Reads data from an existing chunk file.
|
||||
*
|
||||
* @param chunkFile - file where data lives.
|
||||
* @param data - chunk defintion.
|
||||
* @param data - chunk definition.
|
||||
* @return ByteBuffer
|
||||
* @throws IOException
|
||||
* @throws ExecutionException
|
||||
|
@ -284,8 +284,8 @@ public final class ChunkUtils {
|
|||
byte[] data, ChunkInfo info) {
|
||||
Preconditions.checkNotNull(msg);
|
||||
|
||||
ContainerProtos.ReadChunkReponseProto.Builder response =
|
||||
ContainerProtos.ReadChunkReponseProto.newBuilder();
|
||||
ContainerProtos.ReadChunkResponseProto.Builder response =
|
||||
ContainerProtos.ReadChunkResponseProto.newBuilder();
|
||||
response.setChunkData(info.getProtoBufMessage());
|
||||
response.setData(ByteString.copyFrom(data));
|
||||
response.setPipeline(msg.getReadChunk().getPipeline());
|
||||
|
|
|
@ -170,9 +170,9 @@ public class ContainerData {
|
|||
}
|
||||
|
||||
/**
|
||||
* This function serves as the generic key for OzoneCache class. Both
|
||||
* This function serves as the generic key for ContainerCache class. Both
|
||||
* ContainerData and ContainerKeyData overrides this function to appropriately
|
||||
* return the right name that can be used in OzoneCache.
|
||||
* return the right name that can be used in ContainerCache.
|
||||
*
|
||||
* @return String Name.
|
||||
*/
|
||||
|
|
|
@ -0,0 +1,160 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.ozone.container.common.helpers;
|
||||
|
||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
|
||||
/**
|
||||
* Helper class to convert Protobuf to Java classes.
|
||||
*/
|
||||
public class KeyData {
|
||||
private final String containerName;
|
||||
private final String keyName;
|
||||
private final Map<String, String> metadata;
|
||||
|
||||
/**
|
||||
* Please note : when we are working with keys, we don't care what they point
|
||||
* to. So we We don't read chunkinfo nor validate them. It is responsibility
|
||||
* of higher layer like ozone. We just read and write data from network.
|
||||
*/
|
||||
private List<ContainerProtos.ChunkInfo> chunks;
|
||||
|
||||
/**
|
||||
* Constructs a KeyData Object.
|
||||
*
|
||||
* @param containerName
|
||||
* @param keyName
|
||||
*/
|
||||
public KeyData(String containerName, String keyName) {
|
||||
this.containerName = containerName;
|
||||
this.keyName = keyName;
|
||||
this.metadata = new TreeMap<>();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a keyData object from the protobuf data.
|
||||
*
|
||||
* @param data - Protobuf data.
|
||||
* @return - KeyData
|
||||
* @throws IOException
|
||||
*/
|
||||
public static KeyData getFromProtoBuf(ContainerProtos.KeyData data) throws
|
||||
IOException {
|
||||
KeyData keyData = new KeyData(data.getContainerName(), data.getName());
|
||||
for (int x = 0; x < data.getMetadataCount(); x++) {
|
||||
keyData.addMetadata(data.getMetadata(x).getKey(),
|
||||
data.getMetadata(x).getValue());
|
||||
}
|
||||
keyData.setChunks(data.getChunksList());
|
||||
return keyData;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a Protobuf message from KeyData.
|
||||
* @return Proto Buf Message.
|
||||
*/
|
||||
public ContainerProtos.KeyData getProtoBufMessage() {
|
||||
ContainerProtos.KeyData.Builder builder =
|
||||
ContainerProtos.KeyData.newBuilder();
|
||||
builder.setContainerName(this.containerName);
|
||||
builder.setName(this.getKeyName());
|
||||
builder.addAllChunks(this.chunks);
|
||||
for (Map.Entry<String, String> entry : metadata.entrySet()) {
|
||||
ContainerProtos.KeyValue.Builder keyValBuilder =
|
||||
ContainerProtos.KeyValue.newBuilder();
|
||||
builder.addMetadata(keyValBuilder.setKey(entry.getKey())
|
||||
.setValue(entry.getValue()).build());
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds metadata.
|
||||
*
|
||||
* @param key - Key
|
||||
* @param value - Value
|
||||
* @throws IOException
|
||||
*/
|
||||
public synchronized void addMetadata(String key, String value) throws
|
||||
IOException {
|
||||
if (this.metadata.containsKey(key)) {
|
||||
throw new IOException("This key already exists. Key " + key);
|
||||
}
|
||||
metadata.put(key, value);
|
||||
}
|
||||
|
||||
public synchronized Map<String, String> getMetadata() {
|
||||
return Collections.unmodifiableMap(this.metadata);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns value of a key.
|
||||
*/
|
||||
public synchronized String getValue(String key) {
|
||||
return metadata.get(key);
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes a metadata entry from the map.
|
||||
*
|
||||
* @param key - Key
|
||||
*/
|
||||
public synchronized void deleteKey(String key) {
|
||||
metadata.remove(key);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns chunks list.
|
||||
*
|
||||
* @return list of chunkinfo.
|
||||
*/
|
||||
public List<ContainerProtos.ChunkInfo> getChunks() {
|
||||
return chunks;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns container Name.
|
||||
* @return String.
|
||||
*/
|
||||
public String getContainerName() {
|
||||
return containerName;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns KeyName.
|
||||
* @return String.
|
||||
*/
|
||||
public String getKeyName() {
|
||||
return keyName;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets Chunk list.
|
||||
*
|
||||
* @param chunks - List of chunks.
|
||||
*/
|
||||
public void setChunks(List<ContainerProtos.ChunkInfo> chunks) {
|
||||
this.chunks = chunks;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,98 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.ozone.container.common.helpers;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
|
||||
import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
|
||||
import org.apache.hadoop.ozone.container.common.utils.LevelDBStore;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.Charset;
|
||||
|
||||
/**
|
||||
* Utils functions to help key functions.
|
||||
*/
|
||||
public final class KeyUtils {
|
||||
public static final String ENCODING_NAME = "UTF-8";
|
||||
public static final Charset ENCODING = Charset.forName(ENCODING_NAME);
|
||||
|
||||
/**
|
||||
* Never Constructed.
|
||||
*/
|
||||
private KeyUtils() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a file handle to LevelDB.
|
||||
*
|
||||
* @param dbPath - DbPath.
|
||||
* @return LevelDB
|
||||
*/
|
||||
public static LevelDBStore getDB(String dbPath) throws IOException {
|
||||
Preconditions.checkNotNull(dbPath);
|
||||
Preconditions.checkState(!dbPath.isEmpty());
|
||||
return new LevelDBStore(new File(dbPath), false);
|
||||
}
|
||||
|
||||
/**
|
||||
* This function is called with containerManager ReadLock held.
|
||||
*
|
||||
* @param container - container.
|
||||
* @param cache - cache
|
||||
* @return LevelDB handle.
|
||||
* @throws IOException
|
||||
*/
|
||||
public static LevelDBStore getDB(ContainerData container,
|
||||
ContainerCache cache) throws IOException {
|
||||
Preconditions.checkNotNull(container);
|
||||
Preconditions.checkNotNull(cache);
|
||||
LevelDBStore db = cache.getDB(container.getContainerName());
|
||||
if (db == null) {
|
||||
db = getDB(container.getDBPath());
|
||||
cache.putDB(container.getContainerName(), db);
|
||||
}
|
||||
return db;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns successful keyResponse.
|
||||
* @param msg - Request.
|
||||
* @return Response.
|
||||
*/
|
||||
public static ContainerProtos.ContainerCommandResponseProto
|
||||
getKeyResponse(ContainerProtos.ContainerCommandRequestProto msg) {
|
||||
return ContainerUtils.getContainerResponse(msg);
|
||||
}
|
||||
|
||||
|
||||
public static ContainerProtos.ContainerCommandResponseProto
|
||||
getKeyDataResponse(ContainerProtos.ContainerCommandRequestProto msg
|
||||
, KeyData data) {
|
||||
ContainerProtos.GetKeyResponseProto.Builder getKey = ContainerProtos
|
||||
.GetKeyResponseProto.newBuilder();
|
||||
getKey.setKeyData(data.getProtoBufMessage());
|
||||
ContainerProtos.ContainerCommandResponseProto.Builder builder =
|
||||
ContainerUtils.getContainerResponse(msg, ContainerProtos.Result
|
||||
.SUCCESS, "");
|
||||
builder.setGetKey(getKey);
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
}
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
|
|||
import org.apache.hadoop.ozone.container.common.interfaces.ChunkManager;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.ContainerLocationManager;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.KeyManager;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -72,6 +73,7 @@ public class ContainerManagerImpl implements ContainerManager {
|
|||
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
|
||||
private ContainerLocationManager locationManager;
|
||||
private ChunkManager chunkManager;
|
||||
private KeyManager keyManager;
|
||||
|
||||
/**
|
||||
* Init call that sets up a container Manager.
|
||||
|
@ -464,6 +466,26 @@ public class ContainerManagerImpl implements ContainerManager {
|
|||
return this.chunkManager;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the Key Manager.
|
||||
*
|
||||
* @param keyManager - Key Manager.
|
||||
*/
|
||||
@Override
|
||||
public void setKeyManager(KeyManager keyManager) {
|
||||
this.keyManager = keyManager;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the Key Manager.
|
||||
*
|
||||
* @return KeyManager.
|
||||
*/
|
||||
@Override
|
||||
public KeyManager getKeyManager() {
|
||||
return this.keyManager;
|
||||
}
|
||||
|
||||
/**
|
||||
* Filter out only container files from the container metadata dir.
|
||||
*/
|
||||
|
|
|
@ -20,13 +20,17 @@ package org.apache.hadoop.ozone.container.common.impl;
|
|||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
|
||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
|
||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
|
||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
|
||||
.ContainerCommandRequestProto;
|
||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
|
||||
.ContainerCommandResponseProto;
|
||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Type;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ChunkUtils;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
|
||||
|
@ -69,6 +73,12 @@ public class Dispatcher implements ContainerDispatcher {
|
|||
return containerProcessHandler(msg);
|
||||
}
|
||||
|
||||
if ((cmdType == Type.PutKey) ||
|
||||
(cmdType == Type.GetKey) ||
|
||||
(cmdType == Type.DeleteKey) ||
|
||||
(cmdType == Type.ListKey)) {
|
||||
return keyProcessHandler(msg);
|
||||
}
|
||||
|
||||
if ((cmdType == Type.WriteChunk) ||
|
||||
(cmdType == Type.ReadChunk) ||
|
||||
|
@ -126,6 +136,48 @@ public class Dispatcher implements ContainerDispatcher {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles the all key related functionality.
|
||||
*
|
||||
* @param msg - command
|
||||
* @return - response
|
||||
* @throws IOException
|
||||
*/
|
||||
private ContainerCommandResponseProto keyProcessHandler(
|
||||
ContainerCommandRequestProto msg) throws IOException {
|
||||
try {
|
||||
switch (msg.getCmdType()) {
|
||||
case PutKey:
|
||||
return handlePutKey(msg);
|
||||
|
||||
case GetKey:
|
||||
return handleGetKey(msg);
|
||||
|
||||
case DeleteKey:
|
||||
return handleDeleteKey(msg);
|
||||
|
||||
case ListKey:
|
||||
return ContainerUtils.unsupportedRequest(msg);
|
||||
|
||||
default:
|
||||
return ContainerUtils.unsupportedRequest(msg);
|
||||
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
LOG.warn("Container operation failed. " +
|
||||
"Container: {} Operation: {} trace ID: {} Error: {}",
|
||||
msg.getCreateContainer().getContainerData().getName(),
|
||||
msg.getCmdType().name(),
|
||||
msg.getTraceID(),
|
||||
ex.toString());
|
||||
|
||||
// TODO : Replace with finer error codes.
|
||||
return ContainerUtils.getContainerResponse(msg,
|
||||
ContainerProtos.Result.CONTAINER_INTERNAL_ERROR,
|
||||
ex.toString()).build();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles the all chunk related functionality.
|
||||
*
|
||||
|
@ -136,7 +188,6 @@ public class Dispatcher implements ContainerDispatcher {
|
|||
private ContainerCommandResponseProto chunkProcessHandler(
|
||||
ContainerCommandRequestProto msg) throws IOException {
|
||||
try {
|
||||
|
||||
switch (msg.getCmdType()) {
|
||||
case WriteChunk:
|
||||
return handleWriteChunk(msg);
|
||||
|
@ -327,4 +378,73 @@ public class Dispatcher implements ContainerDispatcher {
|
|||
return ChunkUtils.getChunkResponse(msg);
|
||||
}
|
||||
|
||||
/**
|
||||
* Put Key handler.
|
||||
*
|
||||
* @param msg - Request.
|
||||
* @return - Response.
|
||||
* @throws IOException
|
||||
*/
|
||||
private ContainerCommandResponseProto handlePutKey(
|
||||
ContainerCommandRequestProto msg) throws IOException {
|
||||
if(!msg.hasPutKey()){
|
||||
LOG.debug("Malformed put key request. trace ID: {}",
|
||||
msg.getTraceID());
|
||||
return ContainerUtils.malformedRequest(msg);
|
||||
}
|
||||
Pipeline pipeline = Pipeline.getFromProtoBuf(msg.getPutKey().getPipeline());
|
||||
Preconditions.checkNotNull(pipeline);
|
||||
KeyData keyData = KeyData.getFromProtoBuf(msg.getPutKey().getKeyData());
|
||||
Preconditions.checkNotNull(keyData);
|
||||
this.containerManager.getKeyManager().putKey(pipeline, keyData);
|
||||
return KeyUtils.getKeyResponse(msg);
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle Get Key.
|
||||
*
|
||||
* @param msg - Request.
|
||||
* @return - Response.
|
||||
* @throws IOException
|
||||
*/
|
||||
private ContainerCommandResponseProto handleGetKey(
|
||||
ContainerCommandRequestProto msg) throws IOException {
|
||||
if(!msg.hasGetKey()){
|
||||
LOG.debug("Malformed get key request. trace ID: {}",
|
||||
msg.getTraceID());
|
||||
return ContainerUtils.malformedRequest(msg);
|
||||
}
|
||||
KeyData keyData = KeyData.getFromProtoBuf(msg.getGetKey().getKeyData());
|
||||
Preconditions.checkNotNull(keyData);
|
||||
KeyData responseData =
|
||||
this.containerManager.getKeyManager().getKey(keyData);
|
||||
return KeyUtils.getKeyDataResponse(msg, responseData);
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle Delete Key.
|
||||
*
|
||||
* @param msg - Request.
|
||||
* @return - Response.
|
||||
* @throws IOException
|
||||
*/
|
||||
private ContainerCommandResponseProto handleDeleteKey(
|
||||
ContainerCommandRequestProto msg) throws IOException {
|
||||
if(!msg.hasDeleteKey()){
|
||||
LOG.debug("Malformed delete key request. trace ID: {}",
|
||||
msg.getTraceID());
|
||||
return ContainerUtils.malformedRequest(msg);
|
||||
}
|
||||
|
||||
Pipeline pipeline =
|
||||
Pipeline.getFromProtoBuf(msg.getDeleteKey().getPipeline());
|
||||
Preconditions.checkNotNull(pipeline);
|
||||
String keyName = msg.getDeleteKey().getName();
|
||||
Preconditions.checkNotNull(keyName);
|
||||
Preconditions.checkState(!keyName.isEmpty());
|
||||
|
||||
this.containerManager.getKeyManager().deleteKey(pipeline, keyName);
|
||||
return KeyUtils.getKeyResponse(msg);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,145 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.container.common.impl;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
|
||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.KeyManager;
|
||||
import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
|
||||
import org.apache.hadoop.ozone.container.common.utils.LevelDBStore;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Key Manager impl.
|
||||
*/
|
||||
public class KeyManagerImpl implements KeyManager {
|
||||
private static final float LOAD_FACTOR = 0.75f;
|
||||
private final ContainerManager containerManager;
|
||||
private final ContainerCache containerCache;
|
||||
|
||||
/**
|
||||
* Constructs a key Manager.
|
||||
*
|
||||
* @param containerManager - Container Manager.
|
||||
*/
|
||||
public KeyManagerImpl(ContainerManager containerManager, Configuration conf) {
|
||||
Preconditions.checkNotNull(containerManager);
|
||||
Preconditions.checkNotNull(conf);
|
||||
int cacheSize = conf.getInt(OzoneConfigKeys.DFS_OZONE_KEY_CACHE,
|
||||
OzoneConfigKeys.DFS_OZONE_KEY_CACHE_DEFAULT);
|
||||
this.containerManager = containerManager;
|
||||
containerCache = new ContainerCache(cacheSize, LOAD_FACTOR, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public void putKey(Pipeline pipeline, KeyData data) throws IOException {
|
||||
containerManager.readLock();
|
||||
try {
|
||||
// We are not locking the key manager since LevelDb serializes all actions
|
||||
// against a single DB. We rely on DB level locking to avoid conflicts.
|
||||
Preconditions.checkNotNull(pipeline);
|
||||
Preconditions.checkNotNull(pipeline.getContainerName());
|
||||
ContainerData cData = containerManager.readContainer(
|
||||
pipeline.getContainerName());
|
||||
LevelDBStore db = KeyUtils.getDB(cData, containerCache);
|
||||
Preconditions.checkNotNull(db);
|
||||
db.put(data.getKeyName().getBytes(KeyUtils.ENCODING), data
|
||||
.getProtoBufMessage().toByteArray());
|
||||
} finally {
|
||||
containerManager.readUnlock();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public KeyData getKey(KeyData data) throws IOException {
|
||||
containerManager.readLock();
|
||||
try {
|
||||
Preconditions.checkNotNull(data);
|
||||
Preconditions.checkNotNull(data.getContainerName());
|
||||
ContainerData cData = containerManager.readContainer(data
|
||||
.getContainerName());
|
||||
LevelDBStore db = KeyUtils.getDB(cData, containerCache);
|
||||
Preconditions.checkNotNull(db);
|
||||
byte[] kData = db.get(data.getKeyName().getBytes(KeyUtils.ENCODING));
|
||||
if(kData == null) {
|
||||
throw new IOException("Unable to find the key.");
|
||||
}
|
||||
ContainerProtos.KeyData keyData =
|
||||
ContainerProtos.KeyData.parseFrom(kData);
|
||||
return KeyData.getFromProtoBuf(keyData);
|
||||
} finally {
|
||||
containerManager.readUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public void deleteKey(Pipeline pipeline, String keyName) throws IOException {
|
||||
containerManager.readLock();
|
||||
try {
|
||||
Preconditions.checkNotNull(pipeline);
|
||||
Preconditions.checkNotNull(pipeline.getContainerName());
|
||||
ContainerData cData = containerManager.readContainer(pipeline
|
||||
.getContainerName());
|
||||
LevelDBStore db = KeyUtils.getDB(cData, containerCache);
|
||||
Preconditions.checkNotNull(db);
|
||||
|
||||
// Note : There is a race condition here, since get and delete
|
||||
// are not atomic. Leaving it here since the impact is refusing
|
||||
// to delete a key which might have just gotten inserted after
|
||||
// the get check.
|
||||
|
||||
byte[] kData = db.get(keyName.getBytes(KeyUtils.ENCODING));
|
||||
if(kData == null) {
|
||||
throw new IOException("Unable to find the key.");
|
||||
}
|
||||
db.delete(keyName.getBytes(KeyUtils.ENCODING));
|
||||
} finally {
|
||||
containerManager.readUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public List<KeyData> listKey(Pipeline pipeline, String prefix, String
|
||||
prevKey, int count) {
|
||||
// TODO :
|
||||
return null;
|
||||
}
|
||||
}
|
|
@ -30,7 +30,6 @@ import java.io.IOException;
|
|||
import java.nio.file.Path;
|
||||
import java.util.List;
|
||||
|
||||
|
||||
/**
|
||||
* Interface for container operations.
|
||||
*/
|
||||
|
@ -41,9 +40,9 @@ public interface ContainerManager extends RwLock {
|
|||
/**
|
||||
* Init call that sets up a container Manager.
|
||||
*
|
||||
* @param config - Configuration.
|
||||
* @param config - Configuration.
|
||||
* @param containerDirs - List of Metadata Container locations.
|
||||
* @param dataset - FSDataset.
|
||||
* @param dataset - FSDataset.
|
||||
* @throws IOException
|
||||
*/
|
||||
void init(Configuration config, List<Path> containerDirs,
|
||||
|
@ -74,8 +73,8 @@ public interface ContainerManager extends RwLock {
|
|||
* As simple interface for container Iterations.
|
||||
*
|
||||
* @param prevKey - Starting KeyValue
|
||||
* @param count - how many to return
|
||||
* @param data - Actual containerData
|
||||
* @param count - how many to return
|
||||
* @param data - Actual containerData
|
||||
* @throws IOException
|
||||
*/
|
||||
void listContainer(String prevKey, long count, List<ContainerData> data)
|
||||
|
@ -99,15 +98,30 @@ public interface ContainerManager extends RwLock {
|
|||
|
||||
/**
|
||||
* Sets the Chunk Manager.
|
||||
*
|
||||
* @param chunkManager - ChunkManager.
|
||||
*/
|
||||
void setChunkManager(ChunkManager chunkManager);
|
||||
|
||||
/**
|
||||
* Gets the Chunk Manager.
|
||||
* @return ChunkManager.
|
||||
*
|
||||
* @return ChunkManager.
|
||||
*/
|
||||
ChunkManager getChunkManager();
|
||||
|
||||
/**
|
||||
* Sets the Key Manager.
|
||||
*
|
||||
* @param keyManager - Key Manager.
|
||||
*/
|
||||
void setKeyManager(KeyManager keyManager);
|
||||
|
||||
/**
|
||||
* Gets the Key Manager.
|
||||
*
|
||||
* @return KeyManager.
|
||||
*/
|
||||
KeyManager getKeyManager();
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,63 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.ozone.container.common.interfaces;
|
||||
|
||||
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* KeyManager deals with Key Operations in the container Level.
|
||||
*/
|
||||
public interface KeyManager {
|
||||
/**
|
||||
* Puts or overwrites a key.
|
||||
* @param pipeline - Pipeline.
|
||||
* @param data - Key Data.
|
||||
*/
|
||||
void putKey(Pipeline pipeline, KeyData data) throws IOException;
|
||||
|
||||
/**
|
||||
* Gets an existing key.
|
||||
* @param data - Key Data.
|
||||
* @return Key Data.
|
||||
*/
|
||||
KeyData getKey(KeyData data) throws IOException;
|
||||
|
||||
/**
|
||||
* Deletes an existing Key.
|
||||
* @param pipeline - Pipeline.
|
||||
* @param keyName Key Data.
|
||||
*/
|
||||
void deleteKey(Pipeline pipeline, String keyName) throws IOException;
|
||||
|
||||
/**
|
||||
* List keys in a container.
|
||||
* @param pipeline - pipeline.
|
||||
* @param prefix - Prefix in needed.
|
||||
* @param prevKey - Key to Start from, EMPTY_STRING to begin.
|
||||
* @param count - Number of keys to return.
|
||||
* @return List of Keys that match the criteria.
|
||||
*/
|
||||
List<KeyData> listKey(Pipeline pipeline, String prefix, String prevKey, int
|
||||
count);
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,111 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.container.common.utils;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.commons.collections.map.LRUMap;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
/**
|
||||
* container cache is a LRUMap that maintains the DB handles.
|
||||
*/
|
||||
public class ContainerCache extends LRUMap {
|
||||
static final Log LOG = LogFactory.getLog(ContainerCache.class);
|
||||
private final Lock lock = new ReentrantLock();
|
||||
|
||||
/**
|
||||
* Constructs a cache that holds DBHandle references.
|
||||
*/
|
||||
public ContainerCache(int maxSize, float loadFactor, boolean
|
||||
scanUntilRemovable) {
|
||||
super(maxSize, loadFactor, scanUntilRemovable);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
protected boolean removeLRU(LinkEntry entry) {
|
||||
lock.lock();
|
||||
try {
|
||||
LevelDBStore db = (LevelDBStore) entry.getValue();
|
||||
db.close();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Error closing DB. Container: " + entry.getKey().toString(), e);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a DB handle if available, null otherwise.
|
||||
*
|
||||
* @param containerName - Name of the container.
|
||||
* @return OzoneLevelDBStore.
|
||||
*/
|
||||
public LevelDBStore getDB(String containerName) {
|
||||
Preconditions.checkNotNull(containerName);
|
||||
Preconditions.checkState(!containerName.isEmpty());
|
||||
lock.lock();
|
||||
try {
|
||||
return (LevelDBStore) this.get(containerName);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a new DB to the cache.
|
||||
*
|
||||
* @param containerName - Name of the container
|
||||
* @param db - DB handle
|
||||
*/
|
||||
public void putDB(String containerName, LevelDBStore db) {
|
||||
Preconditions.checkNotNull(containerName);
|
||||
Preconditions.checkState(!containerName.isEmpty());
|
||||
lock.lock();
|
||||
try {
|
||||
this.put(containerName, db);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove an entry from the cache.
|
||||
*
|
||||
* @param containerName - Name of the container.
|
||||
*/
|
||||
public void removeDB(String containerName) {
|
||||
Preconditions.checkNotNull(containerName);
|
||||
Preconditions.checkState(!containerName.isEmpty());
|
||||
lock.lock();
|
||||
try {
|
||||
this.remove(containerName);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -22,6 +22,7 @@ import org.fusesource.leveldbjni.JniDBFactory;
|
|||
import org.iq80.leveldb.DB;
|
||||
import org.iq80.leveldb.DBIterator;
|
||||
import org.iq80.leveldb.Options;
|
||||
import org.iq80.leveldb.WriteOptions;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
@ -58,7 +59,9 @@ public class LevelDBStore {
|
|||
* @param value - value
|
||||
*/
|
||||
public void put(byte[] key, byte[] value) {
|
||||
db.put(key, value);
|
||||
WriteOptions options = new WriteOptions();
|
||||
options.sync(true);
|
||||
db.put(key, value, options);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -24,9 +24,11 @@ import org.apache.hadoop.ozone.OzoneConfigKeys;
|
|||
import org.apache.hadoop.ozone.container.common.impl.ChunkManagerImpl;
|
||||
import org.apache.hadoop.ozone.container.common.impl.ContainerManagerImpl;
|
||||
import org.apache.hadoop.ozone.container.common.impl.Dispatcher;
|
||||
import org.apache.hadoop.ozone.container.common.impl.KeyManagerImpl;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.ChunkManager;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.KeyManager;
|
||||
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -51,6 +53,7 @@ public class OzoneContainer {
|
|||
private final ContainerManager manager;
|
||||
private final XceiverServer server;
|
||||
private final ChunkManager chunkManager;
|
||||
private final KeyManager keyManager;
|
||||
|
||||
/**
|
||||
* Creates a network endpoint and enables Ozone container.
|
||||
|
@ -80,6 +83,9 @@ public class OzoneContainer {
|
|||
this.chunkManager = new ChunkManagerImpl(manager);
|
||||
manager.setChunkManager(this.chunkManager);
|
||||
|
||||
this.keyManager = new KeyManagerImpl(manager, ozoneConfig);
|
||||
manager.setKeyManager(this.keyManager);
|
||||
|
||||
this.dispatcher = new Dispatcher(manager);
|
||||
server = new XceiverServer(this.ozoneConfig, this.dispatcher);
|
||||
}
|
||||
|
|
|
@ -49,25 +49,24 @@ import "hdfs.proto";
|
|||
* 5. ListContainer - Returns the list of containers on this
|
||||
* datanode. This will be used by tests and tools.
|
||||
*
|
||||
* 6. CreateKey - Given a valid container, creates a key.
|
||||
* 6. PutKey - Given a valid container, creates a key.
|
||||
*
|
||||
* 7. ReadKey - Allows user to read the metadata of a Key.
|
||||
* 7. GetKey - Allows user to read the metadata of a Key.
|
||||
*
|
||||
* 8. UpdateKey - Updates the metadata of a Key.
|
||||
* 8. DeleteKey - Deletes a given key.
|
||||
*
|
||||
* 9. DeleteKey - Deletes a given key.
|
||||
*
|
||||
* 10. ListKey - Returns a list of keys that are present inside
|
||||
* 9. ListKey - Returns a list of keys that are present inside
|
||||
* a given container.
|
||||
*
|
||||
* 11. ReadChunk - Allows us to read a chunk.
|
||||
* 10. ReadChunk - Allows us to read a chunk.
|
||||
*
|
||||
* 12. DeleteChunk - Delete an unused chunk.
|
||||
* 11. DeleteChunk - Delete an unused chunk.
|
||||
*
|
||||
* 13. WriteChunk - Allows us to write a chunk
|
||||
* 12. WriteChunk - Allows us to write a chunk
|
||||
*
|
||||
* 14. ListChunk - Given a Container/Key returns the list of Chunks.
|
||||
* 13. ListChunk - Given a Container/Key returns the list of Chunks.
|
||||
*
|
||||
* 14. CompactChunk - Re-writes a chunk based on Offsets.
|
||||
*/
|
||||
|
||||
enum Type {
|
||||
|
@ -77,16 +76,16 @@ enum Type {
|
|||
DeleteContainer = 4;
|
||||
ListContainer = 5;
|
||||
|
||||
CreateKey = 6;
|
||||
Readkey = 7;
|
||||
UpdateKey = 8;
|
||||
DeleteKey = 9;
|
||||
ListKey = 10;
|
||||
PutKey = 6;
|
||||
GetKey = 7;
|
||||
DeleteKey = 8;
|
||||
ListKey = 9;
|
||||
|
||||
ReadChunk = 11;
|
||||
DeleteChunk = 12;
|
||||
WriteChunk = 13;
|
||||
ListChunk = 14;
|
||||
ReadChunk = 10;
|
||||
DeleteChunk = 11;
|
||||
WriteChunk = 12;
|
||||
ListChunk = 13;
|
||||
CompactChunk = 14;
|
||||
}
|
||||
|
||||
|
||||
|
@ -95,7 +94,6 @@ enum Result {
|
|||
UNSUPPORTED_REQUEST = 2;
|
||||
MALFORMED_REQUEST = 3;
|
||||
CONTAINER_INTERNAL_ERROR = 4;
|
||||
|
||||
}
|
||||
|
||||
message ContainerCommandRequestProto {
|
||||
|
@ -115,16 +113,15 @@ message ContainerCommandRequestProto {
|
|||
optional DeleteContainerRequestProto deleteContainer = 6;
|
||||
optional ListContainerRequestProto listContainer = 7;
|
||||
|
||||
optional CreateKeyRequestProto createKey = 8;
|
||||
optional ReadKeyRequestProto readKey = 9;
|
||||
optional UpdateKeyRequestProto updateKey = 10;
|
||||
optional DeleteKeyRequestProto deleteKey = 11;
|
||||
optional ListKeyRequestProto listKey = 12;
|
||||
optional PutKeyRequestProto putKey = 8;
|
||||
optional GetKeyRequestProto getKey = 9;
|
||||
optional DeleteKeyRequestProto deleteKey = 10;
|
||||
optional ListKeyRequestProto listKey = 11;
|
||||
|
||||
optional ReadChunkRequestProto readChunk = 13;
|
||||
optional WriteChunkRequestProto writeChunk = 14;
|
||||
optional DeleteChunkRequestProto deleteChunk = 15;
|
||||
optional ListChunkRequestProto listChunk = 16;
|
||||
optional ReadChunkRequestProto readChunk = 12;
|
||||
optional WriteChunkRequestProto writeChunk = 13;
|
||||
optional DeleteChunkRequestProto deleteChunk = 14;
|
||||
optional ListChunkRequestProto listChunk = 15;
|
||||
}
|
||||
|
||||
message ContainerCommandResponseProto {
|
||||
|
@ -137,16 +134,15 @@ message ContainerCommandResponseProto {
|
|||
optional DeleteContainerResponseProto deleteContainer = 6;
|
||||
optional ListContainerResponseProto listContainer = 7;
|
||||
|
||||
optional CreateKeyResponseProto createKey = 8;
|
||||
optional ReadKeyResponeProto readKey = 9;
|
||||
optional UpdateKeyResponseProto updateKey = 10;
|
||||
optional DeleteKeyResponeProto deleteKey = 11;
|
||||
optional ListKeyResponeProto listKey = 12;
|
||||
optional PutKeyResponseProto putKey = 8;
|
||||
optional GetKeyResponseProto getKey = 9;
|
||||
optional DeleteKeyResponseProto deleteKey = 10;
|
||||
optional ListKeyResponseProto listKey = 11;
|
||||
|
||||
optional WriteChunkReponseProto writeChunk = 13;
|
||||
optional ReadChunkReponseProto readChunk = 14;
|
||||
optional DeleteChunkResponseProto deleteChunk = 15;
|
||||
optional ListChunkResponseProto listChunk = 16;
|
||||
optional WriteChunkResponseProto writeChunk = 12;
|
||||
optional ReadChunkResponseProto readChunk = 13;
|
||||
optional DeleteChunkResponseProto deleteChunk = 14;
|
||||
optional ListChunkResponseProto listChunk = 15;
|
||||
|
||||
required Result result = 17;
|
||||
optional string message = 18;
|
||||
|
@ -222,37 +218,30 @@ message ListContainerResponseProto {
|
|||
}
|
||||
|
||||
|
||||
message ContainerKeyData {
|
||||
optional string containerName = 1;
|
||||
message KeyData {
|
||||
required string containerName = 1;
|
||||
required string name = 2;
|
||||
repeated KeyValue metadata = 3;
|
||||
optional int64 flags = 3; // for future use.
|
||||
repeated KeyValue metadata = 4;
|
||||
repeated ChunkInfo chunks = 5;
|
||||
}
|
||||
|
||||
// Key Messages.
|
||||
message CreateKeyRequestProto {
|
||||
message PutKeyRequestProto {
|
||||
required Pipeline pipeline = 1;
|
||||
required ContainerKeyData containerKeyData = 2;
|
||||
required KeyData keyData = 2;
|
||||
}
|
||||
|
||||
message CreateKeyResponseProto {
|
||||
message PutKeyResponseProto {
|
||||
}
|
||||
|
||||
message ReadKeyRequestProto {
|
||||
message GetKeyRequestProto {
|
||||
required Pipeline pipeline = 1;
|
||||
required ContainerKeyData containerKeyData = 2;
|
||||
required KeyData keyData = 2;
|
||||
}
|
||||
|
||||
message ReadKeyResponeProto {
|
||||
repeated KeyValue metadata = 1;
|
||||
repeated ChunkInfo chunkData = 2;
|
||||
}
|
||||
|
||||
message UpdateKeyRequestProto {
|
||||
required Pipeline pipeline = 1;
|
||||
required ContainerKeyData containerKeyData = 2;
|
||||
}
|
||||
|
||||
message UpdateKeyResponseProto {
|
||||
message GetKeyResponseProto {
|
||||
required KeyData keyData = 1;
|
||||
}
|
||||
|
||||
|
||||
|
@ -261,17 +250,19 @@ message DeleteKeyRequestProto {
|
|||
required string name = 2;
|
||||
}
|
||||
|
||||
message DeleteKeyResponeProto {
|
||||
message DeleteKeyResponseProto {
|
||||
}
|
||||
|
||||
message ListKeyRequestProto {
|
||||
required Pipeline pipeline = 1;
|
||||
required string prevKey = 2;
|
||||
required uint32 count = 3;
|
||||
optional string prefix = 2; // if specified returns keys that match prefix.
|
||||
required string prevKey = 3;
|
||||
required uint32 count = 4;
|
||||
|
||||
}
|
||||
|
||||
message ListKeyResponeProto {
|
||||
repeated ContainerKeyData containerKeyData = 1;
|
||||
message ListKeyResponseProto {
|
||||
repeated KeyData keyData = 1;
|
||||
}
|
||||
|
||||
// Chunk Operations
|
||||
|
@ -291,7 +282,7 @@ message WriteChunkRequestProto {
|
|||
required bytes data = 4;
|
||||
}
|
||||
|
||||
message WriteChunkReponseProto {
|
||||
message WriteChunkResponseProto {
|
||||
}
|
||||
|
||||
message ReadChunkRequestProto {
|
||||
|
@ -300,7 +291,7 @@ message ReadChunkRequestProto {
|
|||
required ChunkInfo chunkData = 3;
|
||||
}
|
||||
|
||||
message ReadChunkReponseProto {
|
||||
message ReadChunkResponseProto {
|
||||
required Pipeline pipeline = 1;
|
||||
required ChunkInfo chunkData = 2;
|
||||
required bytes data = 3;
|
||||
|
|
|
@ -20,8 +20,6 @@ package org.apache.hadoop.ozone.container;
|
|||
|
||||
import com.google.protobuf.ByteString;
|
||||
import org.apache.commons.codec.binary.Hex;
|
||||
import org.apache.commons.codec.digest.DigestUtils;
|
||||
import org.apache.commons.io.IOExceptionWithCause;
|
||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
|
||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
|
||||
.ContainerCommandRequestProto;
|
||||
|
@ -31,11 +29,15 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
|||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
|
||||
import org.junit.Assert;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.ServerSocket;
|
||||
import java.security.MessageDigest;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.UUID;
|
||||
|
||||
|
@ -231,7 +233,7 @@ public class ContainerTestHelper {
|
|||
* @return ContainerCommandRequestProto.
|
||||
*/
|
||||
public static ContainerCommandResponseProto
|
||||
getCreateContainerResponse(ContainerCommandRequestProto request) throws
|
||||
getCreateContainerResponse(ContainerCommandRequestProto request) throws
|
||||
IOException {
|
||||
ContainerProtos.CreateContainerResponseProto.Builder createResponse =
|
||||
ContainerProtos.CreateContainerResponseProto.newBuilder();
|
||||
|
@ -244,4 +246,89 @@ public class ContainerTestHelper {
|
|||
response.setResult(ContainerProtos.Result.SUCCESS);
|
||||
return response.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the PutKeyRequest for test purpose.
|
||||
*
|
||||
* @param writeRequest - Write Chunk Request.
|
||||
* @return - Request
|
||||
*/
|
||||
public static ContainerCommandRequestProto getPutKeyRequest(
|
||||
ContainerProtos.WriteChunkRequestProto writeRequest) {
|
||||
ContainerProtos.PutKeyRequestProto.Builder putRequest =
|
||||
ContainerProtos.PutKeyRequestProto.newBuilder();
|
||||
|
||||
putRequest.setPipeline(writeRequest.getPipeline());
|
||||
KeyData keyData = new KeyData(writeRequest.getPipeline().getContainerName(),
|
||||
writeRequest.getKeyName());
|
||||
List<ContainerProtos.ChunkInfo> newList = new LinkedList<>();
|
||||
newList.add(writeRequest.getChunkData());
|
||||
keyData.setChunks(newList);
|
||||
putRequest.setKeyData(keyData.getProtoBufMessage());
|
||||
|
||||
ContainerCommandRequestProto.Builder request =
|
||||
ContainerCommandRequestProto.newBuilder();
|
||||
request.setCmdType(ContainerProtos.Type.PutKey);
|
||||
request.setPutKey(putRequest);
|
||||
return request.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets a GetKeyRequest for test purpose.
|
||||
*
|
||||
* @param putKeyRequest - putKeyRequest.
|
||||
* @return - Request
|
||||
*/
|
||||
public static ContainerCommandRequestProto getKeyRequest(
|
||||
ContainerProtos.PutKeyRequestProto putKeyRequest) {
|
||||
ContainerProtos.GetKeyRequestProto.Builder getRequest =
|
||||
ContainerProtos.GetKeyRequestProto.newBuilder();
|
||||
ContainerProtos.KeyData.Builder keyData = ContainerProtos.KeyData
|
||||
.newBuilder();
|
||||
keyData.setContainerName(putKeyRequest.getPipeline().getContainerName());
|
||||
keyData.setName(putKeyRequest.getKeyData().getName());
|
||||
getRequest.setKeyData(keyData);
|
||||
getRequest.setPipeline(putKeyRequest.getPipeline());
|
||||
|
||||
ContainerCommandRequestProto.Builder request =
|
||||
ContainerCommandRequestProto.newBuilder();
|
||||
request.setCmdType(ContainerProtos.Type.GetKey);
|
||||
request.setGetKey(getRequest);
|
||||
return request.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify the response against the request.
|
||||
* @param request - Request
|
||||
* @param response - Response
|
||||
*/
|
||||
public static void verifyGetKey(ContainerCommandRequestProto request,
|
||||
ContainerCommandResponseProto response) {
|
||||
Assert.assertEquals(request.getTraceID(), response.getTraceID());
|
||||
Assert.assertEquals(response.getResult(), ContainerProtos.Result.SUCCESS);
|
||||
ContainerProtos.PutKeyRequestProto putKey = request.getPutKey();
|
||||
ContainerProtos. GetKeyRequestProto getKey = request.getGetKey();
|
||||
Assert.assertEquals(putKey.getKeyData().getChunksCount(),
|
||||
getKey.getKeyData().getChunksCount());
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
* @param putKeyRequest - putKeyRequest.
|
||||
* @return - Request
|
||||
*/
|
||||
public static ContainerCommandRequestProto getDeleteKeyRequest(
|
||||
ContainerProtos.PutKeyRequestProto putKeyRequest) {
|
||||
ContainerProtos.DeleteKeyRequestProto.Builder delRequest =
|
||||
ContainerProtos.DeleteKeyRequestProto.newBuilder();
|
||||
delRequest.setPipeline(putKeyRequest.getPipeline());
|
||||
delRequest.setName(putKeyRequest.getKeyData().getName());
|
||||
ContainerCommandRequestProto.Builder request =
|
||||
ContainerCommandRequestProto.newBuilder();
|
||||
request.setCmdType(ContainerProtos.Type.DeleteKey);
|
||||
request.setDeleteKey(delRequest);
|
||||
return request.build();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.ozone.container.common.impl;
|
|||
import org.apache.commons.codec.binary.Hex;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||
import org.apache.hadoop.ozone.OzoneConfiguration;
|
||||
|
@ -28,6 +29,7 @@ import org.apache.hadoop.ozone.OzoneConsts;
|
|||
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
|
||||
import org.apache.hadoop.ozone.container.common.utils.LevelDBStore;
|
||||
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
|
||||
|
@ -60,6 +62,7 @@ import static org.apache.hadoop.ozone.container.ContainerTestHelper.getChunk;
|
|||
import static org.apache.hadoop.ozone.container.ContainerTestHelper.getData;
|
||||
import static org.apache.hadoop.ozone.container.ContainerTestHelper
|
||||
.setDataChecksum;
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
/**
|
||||
|
@ -70,6 +73,7 @@ public class TestContainerPersistence {
|
|||
static String path;
|
||||
static ContainerManagerImpl containerManager;
|
||||
static ChunkManagerImpl chunkManager;
|
||||
static KeyManagerImpl keyManager;
|
||||
static OzoneConfiguration conf;
|
||||
static FsDatasetSpi fsDataSet;
|
||||
static MiniDFSCluster cluster;
|
||||
|
@ -103,6 +107,8 @@ public class TestContainerPersistence {
|
|||
containerManager = new ContainerManagerImpl();
|
||||
chunkManager = new ChunkManagerImpl(containerManager);
|
||||
containerManager.setChunkManager(chunkManager);
|
||||
keyManager = new KeyManagerImpl(containerManager, conf);
|
||||
containerManager.setKeyManager(keyManager);
|
||||
|
||||
}
|
||||
|
||||
|
@ -176,9 +182,11 @@ public class TestContainerPersistence {
|
|||
ContainerData data = new ContainerData(containerName);
|
||||
data.addMetadata("VOLUME", "shire");
|
||||
data.addMetadata("owner)", "bilbo");
|
||||
containerManager.createContainer(createSingleNodePipeline(containerName), data);
|
||||
containerManager.createContainer(createSingleNodePipeline(containerName),
|
||||
data);
|
||||
try {
|
||||
containerManager.createContainer(createSingleNodePipeline(containerName), data);
|
||||
containerManager.createContainer(createSingleNodePipeline
|
||||
(containerName), data);
|
||||
fail("Expected Exception not thrown.");
|
||||
} catch (IOException ex) {
|
||||
Assert.assertNotNull(ex);
|
||||
|
@ -194,12 +202,14 @@ public class TestContainerPersistence {
|
|||
ContainerData data = new ContainerData(containerName1);
|
||||
data.addMetadata("VOLUME", "shire");
|
||||
data.addMetadata("owner)", "bilbo");
|
||||
containerManager.createContainer(createSingleNodePipeline(containerName1), data);
|
||||
containerManager.createContainer(createSingleNodePipeline(containerName1)
|
||||
, data);
|
||||
|
||||
data = new ContainerData(containerName2);
|
||||
data.addMetadata("VOLUME", "shire");
|
||||
data.addMetadata("owner)", "bilbo");
|
||||
containerManager.createContainer(createSingleNodePipeline(containerName2), data);
|
||||
containerManager.createContainer(createSingleNodePipeline(containerName2)
|
||||
, data);
|
||||
|
||||
|
||||
Assert.assertTrue(containerManager.getContainerMap()
|
||||
|
@ -218,7 +228,8 @@ public class TestContainerPersistence {
|
|||
data = new ContainerData(containerName1);
|
||||
data.addMetadata("VOLUME", "shire");
|
||||
data.addMetadata("owner)", "bilbo");
|
||||
containerManager.createContainer(createSingleNodePipeline(containerName1), data);
|
||||
containerManager.createContainer(createSingleNodePipeline(containerName1)
|
||||
, data);
|
||||
|
||||
// Assert we still have both containers.
|
||||
Assert.assertTrue(containerManager.getContainerMap()
|
||||
|
@ -246,7 +257,8 @@ public class TestContainerPersistence {
|
|||
ContainerData data = new ContainerData(containerName);
|
||||
data.addMetadata("VOLUME", "shire");
|
||||
data.addMetadata("owner)", "bilbo");
|
||||
containerManager.createContainer(createSingleNodePipeline(containerName), data);
|
||||
containerManager.createContainer(createSingleNodePipeline
|
||||
(containerName), data);
|
||||
testMap.put(containerName, data);
|
||||
}
|
||||
|
||||
|
@ -271,19 +283,10 @@ public class TestContainerPersistence {
|
|||
Assert.assertTrue(testMap.isEmpty());
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes a single chunk.
|
||||
*
|
||||
* @throws IOException
|
||||
* @throws NoSuchAlgorithmException
|
||||
*/
|
||||
@Test
|
||||
public void testWriteChunk() throws IOException, NoSuchAlgorithmException {
|
||||
private ChunkInfo writeChunkHelper(String containerName, String keyName,
|
||||
Pipeline pipeline) throws IOException,
|
||||
NoSuchAlgorithmException {
|
||||
final int datalen = 1024;
|
||||
String containerName = OzoneUtils.getRequestID();
|
||||
String keyName = OzoneUtils.getRequestID();
|
||||
Pipeline pipeline = createSingleNodePipeline(containerName);
|
||||
|
||||
pipeline.setContainerName(containerName);
|
||||
ContainerData cData = new ContainerData(containerName);
|
||||
cData.addMetadata("VOLUME", "shire");
|
||||
|
@ -293,6 +296,23 @@ public class TestContainerPersistence {
|
|||
byte[] data = getData(datalen);
|
||||
setDataChecksum(info, data);
|
||||
chunkManager.writeChunk(pipeline, keyName, info, data);
|
||||
return info;
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes a single chunk.
|
||||
*
|
||||
* @throws IOException
|
||||
* @throws NoSuchAlgorithmException
|
||||
*/
|
||||
@Test
|
||||
public void testWriteChunk() throws IOException,
|
||||
NoSuchAlgorithmException {
|
||||
String containerName = OzoneUtils.getRequestID();
|
||||
String keyName = OzoneUtils.getRequestID();
|
||||
Pipeline pipeline = createSingleNodePipeline(containerName);
|
||||
writeChunkHelper(containerName, keyName, pipeline);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -389,7 +409,7 @@ public class TestContainerPersistence {
|
|||
chunkManager.writeChunk(pipeline, keyName, info, data);
|
||||
try {
|
||||
chunkManager.writeChunk(pipeline, keyName, info, data);
|
||||
} catch(IOException ex) {
|
||||
} catch (IOException ex) {
|
||||
Assert.assertTrue(ex.getMessage().contains(
|
||||
"Rejecting write chunk request. OverWrite flag required."));
|
||||
}
|
||||
|
@ -469,4 +489,116 @@ public class TestContainerPersistence {
|
|||
exception.expectMessage("Unable to find the chunk file.");
|
||||
chunkManager.readChunk(pipeline, keyName, info);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests a put key and read key.
|
||||
*
|
||||
* @throws IOException
|
||||
* @throws NoSuchAlgorithmException
|
||||
*/
|
||||
@Test
|
||||
public void testPutKey() throws IOException, NoSuchAlgorithmException {
|
||||
String containerName = OzoneUtils.getRequestID();
|
||||
String keyName = OzoneUtils.getRequestID();
|
||||
Pipeline pipeline = createSingleNodePipeline(containerName);
|
||||
ChunkInfo info = writeChunkHelper(containerName, keyName, pipeline);
|
||||
KeyData keyData = new KeyData(containerName, keyName);
|
||||
List<ContainerProtos.ChunkInfo> chunkList = new LinkedList<>();
|
||||
chunkList.add(info.getProtoBufMessage());
|
||||
keyData.setChunks(chunkList);
|
||||
keyManager.putKey(pipeline, keyData);
|
||||
KeyData readKeyData = keyManager.getKey(keyData);
|
||||
ChunkInfo readChunk =
|
||||
ChunkInfo.getFromProtoBuf(readKeyData.getChunks().get(0));
|
||||
Assert.assertEquals(info.getChecksum(), readChunk.getChecksum());
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests a put key and read key.
|
||||
*
|
||||
* @throws IOException
|
||||
* @throws NoSuchAlgorithmException
|
||||
*/
|
||||
@Test
|
||||
public void testPutKeyWithLotsOfChunks() throws IOException,
|
||||
NoSuchAlgorithmException {
|
||||
final int chunkCount = 1024;
|
||||
final int datalen = 1024;
|
||||
String containerName = OzoneUtils.getRequestID();
|
||||
String keyName = OzoneUtils.getRequestID();
|
||||
Pipeline pipeline = createSingleNodePipeline(containerName);
|
||||
List<ChunkInfo> chunkList = new LinkedList<>();
|
||||
ChunkInfo info = writeChunkHelper(containerName, keyName, pipeline);
|
||||
chunkList.add(info);
|
||||
for (int x = 1; x < chunkCount; x++) {
|
||||
info = getChunk(keyName, x, x * datalen, datalen);
|
||||
byte[] data = getData(datalen);
|
||||
setDataChecksum(info, data);
|
||||
chunkManager.writeChunk(pipeline, keyName, info, data);
|
||||
chunkList.add(info);
|
||||
}
|
||||
|
||||
KeyData keyData = new KeyData(containerName, keyName);
|
||||
List<ContainerProtos.ChunkInfo> chunkProtoList = new LinkedList<>();
|
||||
for (ChunkInfo i : chunkList) {
|
||||
chunkProtoList.add(i.getProtoBufMessage());
|
||||
}
|
||||
keyData.setChunks(chunkProtoList);
|
||||
keyManager.putKey(pipeline, keyData);
|
||||
KeyData readKeyData = keyManager.getKey(keyData);
|
||||
ChunkInfo lastChunk = chunkList.get(chunkList.size() - 1);
|
||||
ChunkInfo readChunk =
|
||||
ChunkInfo.getFromProtoBuf(readKeyData.getChunks().get(readKeyData
|
||||
.getChunks().size() - 1));
|
||||
Assert.assertEquals(lastChunk.getChecksum(), readChunk.getChecksum());
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes a key and tries to read it back.
|
||||
*
|
||||
* @throws IOException
|
||||
* @throws NoSuchAlgorithmException
|
||||
*/
|
||||
@Test
|
||||
public void testDeleteKey() throws IOException, NoSuchAlgorithmException {
|
||||
String containerName = OzoneUtils.getRequestID();
|
||||
String keyName = OzoneUtils.getRequestID();
|
||||
Pipeline pipeline = createSingleNodePipeline(containerName);
|
||||
ChunkInfo info = writeChunkHelper(containerName, keyName, pipeline);
|
||||
KeyData keyData = new KeyData(containerName, keyName);
|
||||
List<ContainerProtos.ChunkInfo> chunkList = new LinkedList<>();
|
||||
chunkList.add(info.getProtoBufMessage());
|
||||
keyData.setChunks(chunkList);
|
||||
keyManager.putKey(pipeline, keyData);
|
||||
keyManager.deleteKey(pipeline, keyName);
|
||||
exception.expect(IOException.class);
|
||||
exception.expectMessage("Unable to find the key.");
|
||||
keyManager.getKey(keyData);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tries to Deletes a key twice.
|
||||
*
|
||||
* @throws IOException
|
||||
* @throws NoSuchAlgorithmException
|
||||
*/
|
||||
@Test
|
||||
public void testDeleteKeyTwice() throws IOException,
|
||||
NoSuchAlgorithmException {
|
||||
String containerName = OzoneUtils.getRequestID();
|
||||
String keyName = OzoneUtils.getRequestID();
|
||||
Pipeline pipeline = createSingleNodePipeline(containerName);
|
||||
ChunkInfo info = writeChunkHelper(containerName, keyName, pipeline);
|
||||
KeyData keyData = new KeyData(containerName, keyName);
|
||||
List<ContainerProtos.ChunkInfo> chunkList = new LinkedList<>();
|
||||
chunkList.add(info.getProtoBufMessage());
|
||||
keyData.setChunks(chunkList);
|
||||
keyManager.putKey(pipeline, keyData);
|
||||
keyManager.deleteKey(pipeline, keyName);
|
||||
exception.expect(IOException.class);
|
||||
exception.expectMessage("Unable to find the key.");
|
||||
keyManager.deleteKey(pipeline, keyName);
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -135,6 +135,29 @@ public class TestOzoneContainer {
|
|||
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
|
||||
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
|
||||
|
||||
// Put Key
|
||||
ContainerProtos.ContainerCommandRequestProto putKeyRequest =
|
||||
ContainerTestHelper.getPutKeyRequest(writeChunkRequest.getWriteChunk());
|
||||
|
||||
response = client.sendCommand(putKeyRequest);
|
||||
Assert.assertNotNull(response);
|
||||
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
|
||||
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
|
||||
|
||||
// Get Key
|
||||
request = ContainerTestHelper.getKeyRequest(putKeyRequest.getPutKey());
|
||||
response = client.sendCommand(request);
|
||||
ContainerTestHelper.verifyGetKey(request, response);
|
||||
|
||||
|
||||
// Delete Key
|
||||
request =
|
||||
ContainerTestHelper.getDeleteKeyRequest(putKeyRequest.getPutKey());
|
||||
response = client.sendCommand(request);
|
||||
Assert.assertNotNull(response);
|
||||
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
|
||||
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
|
||||
|
||||
//Delete Chunk
|
||||
request = ContainerTestHelper.getDeleteChunkRequest(writeChunkRequest
|
||||
.getWriteChunk());
|
||||
|
|
Loading…
Reference in New Issue