Revert "HDFS-10224. Implement asynchronous rename for DistributedFileSystem. Contributed by Xiaobing Zhou"
This reverts commit bdc45bef64
.
This commit is contained in:
parent
8a07026e77
commit
d740a90260
|
@ -1247,6 +1247,7 @@ public abstract class FileSystem extends Configured implements Closeable {
|
||||||
/**
|
/**
|
||||||
* Renames Path src to Path dst
|
* Renames Path src to Path dst
|
||||||
* <ul>
|
* <ul>
|
||||||
|
* <li
|
||||||
* <li>Fails if src is a file and dst is a directory.
|
* <li>Fails if src is a file and dst is a directory.
|
||||||
* <li>Fails if src is a directory and dst is a file.
|
* <li>Fails if src is a directory and dst is a file.
|
||||||
* <li>Fails if the parent of dst does not exist or is a file.
|
* <li>Fails if the parent of dst does not exist or is a file.
|
||||||
|
|
|
@ -119,8 +119,7 @@ public class Client implements AutoCloseable {
|
||||||
|
|
||||||
private static final ThreadLocal<Integer> callId = new ThreadLocal<Integer>();
|
private static final ThreadLocal<Integer> callId = new ThreadLocal<Integer>();
|
||||||
private static final ThreadLocal<Integer> retryCount = new ThreadLocal<Integer>();
|
private static final ThreadLocal<Integer> retryCount = new ThreadLocal<Integer>();
|
||||||
private static final ThreadLocal<Future<?>>
|
private static final ThreadLocal<Future<?>> returnValue = new ThreadLocal<>();
|
||||||
RETURN_RPC_RESPONSE = new ThreadLocal<>();
|
|
||||||
private static final ThreadLocal<Boolean> asynchronousMode =
|
private static final ThreadLocal<Boolean> asynchronousMode =
|
||||||
new ThreadLocal<Boolean>() {
|
new ThreadLocal<Boolean>() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -131,8 +130,8 @@ public class Client implements AutoCloseable {
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@Unstable
|
@Unstable
|
||||||
public static <T> Future<T> getReturnRpcResponse() {
|
public static <T> Future<T> getReturnValue() {
|
||||||
return (Future<T>) RETURN_RPC_RESPONSE.get();
|
return (Future<T>) returnValue.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Set call id and retry count for the next call. */
|
/** Set call id and retry count for the next call. */
|
||||||
|
@ -1398,7 +1397,7 @@ public class Client implements AutoCloseable {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
RETURN_RPC_RESPONSE.set(returnFuture);
|
returnValue.set(returnFuture);
|
||||||
return null;
|
return null;
|
||||||
} else {
|
} else {
|
||||||
return getRpcResponse(call, connection);
|
return getRpcResponse(call, connection);
|
||||||
|
@ -1412,7 +1411,7 @@ public class Client implements AutoCloseable {
|
||||||
* synchronous mode.
|
* synchronous mode.
|
||||||
*/
|
*/
|
||||||
@Unstable
|
@Unstable
|
||||||
public static boolean isAsynchronousMode() {
|
static boolean isAsynchronousMode() {
|
||||||
return asynchronousMode.get();
|
return asynchronousMode.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -26,9 +26,7 @@ import java.lang.reflect.Method;
|
||||||
import java.lang.reflect.Proxy;
|
import java.lang.reflect.Proxy;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.Callable;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.Future;
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import javax.net.SocketFactory;
|
import javax.net.SocketFactory;
|
||||||
|
@ -37,7 +35,6 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.io.DataOutputOutputStream;
|
import org.apache.hadoop.io.DataOutputOutputStream;
|
||||||
import org.apache.hadoop.io.Writable;
|
import org.apache.hadoop.io.Writable;
|
||||||
|
@ -70,8 +67,6 @@ import com.google.protobuf.TextFormat;
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
public class ProtobufRpcEngine implements RpcEngine {
|
public class ProtobufRpcEngine implements RpcEngine {
|
||||||
public static final Log LOG = LogFactory.getLog(ProtobufRpcEngine.class);
|
public static final Log LOG = LogFactory.getLog(ProtobufRpcEngine.class);
|
||||||
private static final ThreadLocal<Callable<?>>
|
|
||||||
RETURN_MESSAGE_CALLBACK = new ThreadLocal<>();
|
|
||||||
|
|
||||||
static { // Register the rpcRequest deserializer for WritableRpcEngine
|
static { // Register the rpcRequest deserializer for WritableRpcEngine
|
||||||
org.apache.hadoop.ipc.Server.registerProtocolEngine(
|
org.apache.hadoop.ipc.Server.registerProtocolEngine(
|
||||||
|
@ -81,12 +76,6 @@ public class ProtobufRpcEngine implements RpcEngine {
|
||||||
|
|
||||||
private static final ClientCache CLIENTS = new ClientCache();
|
private static final ClientCache CLIENTS = new ClientCache();
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
@Unstable
|
|
||||||
public static <T> Callable<T> getReturnMessageCallback() {
|
|
||||||
return (Callable<T>) RETURN_MESSAGE_CALLBACK.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
|
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
|
||||||
InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
|
InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
|
||||||
SocketFactory factory, int rpcTimeout) throws IOException {
|
SocketFactory factory, int rpcTimeout) throws IOException {
|
||||||
|
@ -200,7 +189,7 @@ public class ProtobufRpcEngine implements RpcEngine {
|
||||||
* the server.
|
* the server.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public Object invoke(Object proxy, final Method method, Object[] args)
|
public Object invoke(Object proxy, Method method, Object[] args)
|
||||||
throws ServiceException {
|
throws ServiceException {
|
||||||
long startTime = 0;
|
long startTime = 0;
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
|
@ -262,23 +251,6 @@ public class ProtobufRpcEngine implements RpcEngine {
|
||||||
LOG.debug("Call: " + method.getName() + " took " + callTime + "ms");
|
LOG.debug("Call: " + method.getName() + " took " + callTime + "ms");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (Client.isAsynchronousMode()) {
|
|
||||||
final Future<RpcResponseWrapper> frrw = Client.getReturnRpcResponse();
|
|
||||||
Callable<Message> callback = new Callable<Message>() {
|
|
||||||
@Override
|
|
||||||
public Message call() throws Exception {
|
|
||||||
return getReturnMessage(method, frrw.get());
|
|
||||||
}
|
|
||||||
};
|
|
||||||
RETURN_MESSAGE_CALLBACK.set(callback);
|
|
||||||
return null;
|
|
||||||
} else {
|
|
||||||
return getReturnMessage(method, val);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private Message getReturnMessage(final Method method,
|
|
||||||
final RpcResponseWrapper rrw) throws ServiceException {
|
|
||||||
Message prototype = null;
|
Message prototype = null;
|
||||||
try {
|
try {
|
||||||
prototype = getReturnProtoType(method);
|
prototype = getReturnProtoType(method);
|
||||||
|
@ -288,7 +260,7 @@ public class ProtobufRpcEngine implements RpcEngine {
|
||||||
Message returnMessage;
|
Message returnMessage;
|
||||||
try {
|
try {
|
||||||
returnMessage = prototype.newBuilderForType()
|
returnMessage = prototype.newBuilderForType()
|
||||||
.mergeFrom(rrw.theResponseRead).build();
|
.mergeFrom(val.theResponseRead).build();
|
||||||
|
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace(Thread.currentThread().getId() + ": Response <- " +
|
LOG.trace(Thread.currentThread().getId() + ": Response <- " +
|
||||||
|
|
|
@ -84,7 +84,7 @@ public class TestAsyncIPC {
|
||||||
try {
|
try {
|
||||||
final long param = TestIPC.RANDOM.nextLong();
|
final long param = TestIPC.RANDOM.nextLong();
|
||||||
TestIPC.call(client, param, server, conf);
|
TestIPC.call(client, param, server, conf);
|
||||||
Future<LongWritable> returnFuture = Client.getReturnRpcResponse();
|
Future<LongWritable> returnFuture = Client.getReturnValue();
|
||||||
returnFutures.put(i, returnFuture);
|
returnFutures.put(i, returnFuture);
|
||||||
expectedValues.put(i, param);
|
expectedValues.put(i, param);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
|
|
@ -1,110 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.hdfs;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.concurrent.Callable;
|
|
||||||
import java.util.concurrent.ExecutionException;
|
|
||||||
import java.util.concurrent.Future;
|
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
|
||||||
import org.apache.hadoop.fs.Options;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
|
|
||||||
import org.apache.hadoop.ipc.Client;
|
|
||||||
|
|
||||||
import com.google.common.util.concurrent.AbstractFuture;
|
|
||||||
|
|
||||||
/****************************************************************
|
|
||||||
* Implementation of the asynchronous distributed file system.
|
|
||||||
* This instance of this class is the way end-user code interacts
|
|
||||||
* with a Hadoop DistributedFileSystem in an asynchronous manner.
|
|
||||||
*
|
|
||||||
*****************************************************************/
|
|
||||||
@Unstable
|
|
||||||
public class AsyncDistributedFileSystem {
|
|
||||||
|
|
||||||
private final DistributedFileSystem dfs;
|
|
||||||
|
|
||||||
AsyncDistributedFileSystem(final DistributedFileSystem dfs) {
|
|
||||||
this.dfs = dfs;
|
|
||||||
}
|
|
||||||
|
|
||||||
static <T> Future<T> getReturnValue() {
|
|
||||||
final Callable<T> returnValueCallback = ClientNamenodeProtocolTranslatorPB
|
|
||||||
.getReturnValueCallback();
|
|
||||||
Future<T> returnFuture = new AbstractFuture<T>() {
|
|
||||||
public T get() throws InterruptedException, ExecutionException {
|
|
||||||
try {
|
|
||||||
set(returnValueCallback.call());
|
|
||||||
} catch (Exception e) {
|
|
||||||
setException(e);
|
|
||||||
}
|
|
||||||
return super.get();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
return returnFuture;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Renames Path src to Path dst
|
|
||||||
* <ul>
|
|
||||||
* <li>Fails if src is a file and dst is a directory.
|
|
||||||
* <li>Fails if src is a directory and dst is a file.
|
|
||||||
* <li>Fails if the parent of dst does not exist or is a file.
|
|
||||||
* </ul>
|
|
||||||
* <p>
|
|
||||||
* If OVERWRITE option is not passed as an argument, rename fails if the dst
|
|
||||||
* already exists.
|
|
||||||
* <p>
|
|
||||||
* If OVERWRITE option is passed as an argument, rename overwrites the dst if
|
|
||||||
* it is a file or an empty directory. Rename fails if dst is a non-empty
|
|
||||||
* directory.
|
|
||||||
* <p>
|
|
||||||
* Note that atomicity of rename is dependent on the file system
|
|
||||||
* implementation. Please refer to the file system documentation for details.
|
|
||||||
* This default implementation is non atomic.
|
|
||||||
*
|
|
||||||
* @param src
|
|
||||||
* path to be renamed
|
|
||||||
* @param dst
|
|
||||||
* new path after rename
|
|
||||||
* @throws IOException
|
|
||||||
* on failure
|
|
||||||
* @return an instance of Future, #get of which is invoked to wait for
|
|
||||||
* asynchronous call being finished.
|
|
||||||
*/
|
|
||||||
public Future<Void> rename(Path src, Path dst,
|
|
||||||
final Options.Rename... options) throws IOException {
|
|
||||||
dfs.getFsStatistics().incrementWriteOps(1);
|
|
||||||
|
|
||||||
final Path absSrc = dfs.fixRelativePart(src);
|
|
||||||
final Path absDst = dfs.fixRelativePart(dst);
|
|
||||||
|
|
||||||
final boolean isAsync = Client.isAsynchronousMode();
|
|
||||||
Client.setAsynchronousMode(true);
|
|
||||||
try {
|
|
||||||
dfs.getClient().rename(dfs.getPathName(absSrc), dfs.getPathName(absDst),
|
|
||||||
options);
|
|
||||||
return getReturnValue();
|
|
||||||
} finally {
|
|
||||||
Client.setAsynchronousMode(isAsync);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -31,7 +31,6 @@ import java.util.Map;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.BlockLocation;
|
import org.apache.hadoop.fs.BlockLocation;
|
||||||
import org.apache.hadoop.fs.BlockStorageLocation;
|
import org.apache.hadoop.fs.BlockStorageLocation;
|
||||||
|
@ -206,7 +205,7 @@ public class DistributedFileSystem extends FileSystem {
|
||||||
* @return path component of {file}
|
* @return path component of {file}
|
||||||
* @throws IllegalArgumentException if URI does not belong to this DFS
|
* @throws IllegalArgumentException if URI does not belong to this DFS
|
||||||
*/
|
*/
|
||||||
String getPathName(Path file) {
|
private String getPathName(Path file) {
|
||||||
checkPath(file);
|
checkPath(file);
|
||||||
String result = file.toUri().getPath();
|
String result = file.toUri().getPath();
|
||||||
if (!DFSUtilClient.isValidName(result)) {
|
if (!DFSUtilClient.isValidName(result)) {
|
||||||
|
@ -2480,23 +2479,4 @@ public class DistributedFileSystem extends FileSystem {
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
private final AsyncDistributedFileSystem adfs =
|
|
||||||
new AsyncDistributedFileSystem(this);
|
|
||||||
|
|
||||||
/** @return an {@link AsyncDistributedFileSystem} object. */
|
|
||||||
@Unstable
|
|
||||||
public AsyncDistributedFileSystem getAsyncDistributedFileSystem() {
|
|
||||||
return adfs;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected Path fixRelativePart(Path p) {
|
|
||||||
return super.fixRelativePart(p);
|
|
||||||
}
|
|
||||||
|
|
||||||
Statistics getFsStatistics() {
|
|
||||||
return statistics;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,14 +24,11 @@ import java.util.EnumSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import java.util.concurrent.Callable;
|
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
|
||||||
import org.apache.hadoop.crypto.CryptoProtocolVersion;
|
import org.apache.hadoop.crypto.CryptoProtocolVersion;
|
||||||
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
|
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
|
||||||
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
|
|
||||||
import org.apache.hadoop.fs.CacheFlag;
|
import org.apache.hadoop.fs.CacheFlag;
|
||||||
import org.apache.hadoop.fs.ContentSummary;
|
import org.apache.hadoop.fs.ContentSummary;
|
||||||
import org.apache.hadoop.fs.CreateFlag;
|
import org.apache.hadoop.fs.CreateFlag;
|
||||||
|
@ -155,14 +152,13 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetPer
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetQuotaRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetQuotaRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetReplicationRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetReplicationRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetSafeModeRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetSafeModeRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetStoragePolicyRequestProto;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.TruncateRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.TruncateRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetStoragePolicyRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos;
|
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncryptionZoneRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncryptionZoneRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.EncryptionZoneProto;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsRequestProto;
|
||||||
|
@ -174,9 +170,7 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifie
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
|
||||||
import org.apache.hadoop.io.EnumSetWritable;
|
import org.apache.hadoop.io.EnumSetWritable;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.ipc.Client;
|
|
||||||
import org.apache.hadoop.ipc.ProtobufHelper;
|
import org.apache.hadoop.ipc.ProtobufHelper;
|
||||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
|
||||||
import org.apache.hadoop.ipc.ProtocolMetaInterface;
|
import org.apache.hadoop.ipc.ProtocolMetaInterface;
|
||||||
import org.apache.hadoop.ipc.ProtocolTranslator;
|
import org.apache.hadoop.ipc.ProtocolTranslator;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
|
@ -188,9 +182,12 @@ import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenReque
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
|
||||||
import com.google.protobuf.ByteString;
|
import com.google.protobuf.ByteString;
|
||||||
import com.google.protobuf.Message;
|
|
||||||
import com.google.protobuf.ServiceException;
|
import com.google.protobuf.ServiceException;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
|
||||||
|
import static org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos
|
||||||
|
.EncryptionZoneProto;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class forwards NN's ClientProtocol calls as RPC calls to the NN server
|
* This class forwards NN's ClientProtocol calls as RPC calls to the NN server
|
||||||
* while translating from the parameter types used in ClientProtocol to the
|
* while translating from the parameter types used in ClientProtocol to the
|
||||||
|
@ -201,8 +198,6 @@ import com.google.protobuf.ServiceException;
|
||||||
public class ClientNamenodeProtocolTranslatorPB implements
|
public class ClientNamenodeProtocolTranslatorPB implements
|
||||||
ProtocolMetaInterface, ClientProtocol, Closeable, ProtocolTranslator {
|
ProtocolMetaInterface, ClientProtocol, Closeable, ProtocolTranslator {
|
||||||
final private ClientNamenodeProtocolPB rpcProxy;
|
final private ClientNamenodeProtocolPB rpcProxy;
|
||||||
private static final ThreadLocal<Callable<?>>
|
|
||||||
RETURN_VALUE_CALLBACK = new ThreadLocal<>();
|
|
||||||
|
|
||||||
static final GetServerDefaultsRequestProto VOID_GET_SERVER_DEFAULT_REQUEST =
|
static final GetServerDefaultsRequestProto VOID_GET_SERVER_DEFAULT_REQUEST =
|
||||||
GetServerDefaultsRequestProto.newBuilder().build();
|
GetServerDefaultsRequestProto.newBuilder().build();
|
||||||
|
@ -235,12 +230,6 @@ public class ClientNamenodeProtocolTranslatorPB implements
|
||||||
rpcProxy = proxy;
|
rpcProxy = proxy;
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
@Unstable
|
|
||||||
public static <T> Callable<T> getReturnValueCallback() {
|
|
||||||
return (Callable<T>) RETURN_VALUE_CALLBACK.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
RPC.stopProxy(rpcProxy);
|
RPC.stopProxy(rpcProxy);
|
||||||
|
@ -476,7 +465,6 @@ public class ClientNamenodeProtocolTranslatorPB implements
|
||||||
RenameRequestProto req = RenameRequestProto.newBuilder()
|
RenameRequestProto req = RenameRequestProto.newBuilder()
|
||||||
.setSrc(src)
|
.setSrc(src)
|
||||||
.setDst(dst).build();
|
.setDst(dst).build();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
return rpcProxy.rename(null, req).getResult();
|
return rpcProxy.rename(null, req).getResult();
|
||||||
} catch (ServiceException e) {
|
} catch (ServiceException e) {
|
||||||
|
@ -501,22 +489,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
|
||||||
setDst(dst).setOverwriteDest(overwrite).
|
setDst(dst).setOverwriteDest(overwrite).
|
||||||
build();
|
build();
|
||||||
try {
|
try {
|
||||||
if (Client.isAsynchronousMode()) {
|
rpcProxy.rename2(null, req);
|
||||||
rpcProxy.rename2(null, req);
|
|
||||||
|
|
||||||
final Callable<Message> returnMessageCallback = ProtobufRpcEngine
|
|
||||||
.getReturnMessageCallback();
|
|
||||||
Callable<Void> callBack = new Callable<Void>() {
|
|
||||||
@Override
|
|
||||||
public Void call() throws Exception {
|
|
||||||
returnMessageCallback.call();
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
RETURN_VALUE_CALLBACK.set(callBack);
|
|
||||||
} else {
|
|
||||||
rpcProxy.rename2(null, req);
|
|
||||||
}
|
|
||||||
} catch (ServiceException e) {
|
} catch (ServiceException e) {
|
||||||
throw ProtobufHelper.getRemoteException(e);
|
throw ProtobufHelper.getRemoteException(e);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,258 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hdfs;
|
|
||||||
|
|
||||||
import static org.junit.Assert.assertFalse;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
|
|
||||||
import java.io.DataOutputStream;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.security.PrivilegedExceptionAction;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.ExecutionException;
|
|
||||||
import java.util.concurrent.Future;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hadoop.fs.Options.Rename;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
|
||||||
import org.junit.After;
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
public class TestAsyncDFSRename {
|
|
||||||
final Path asyncRenameDir = new Path("/test/async_rename/");
|
|
||||||
public static final Log LOG = LogFactory.getLog(TestAsyncDFSRename.class);
|
|
||||||
final private static Configuration CONF = new HdfsConfiguration();
|
|
||||||
|
|
||||||
final private static String GROUP1_NAME = "group1";
|
|
||||||
final private static String GROUP2_NAME = "group2";
|
|
||||||
final private static String USER1_NAME = "user1";
|
|
||||||
private static final UserGroupInformation USER1;
|
|
||||||
|
|
||||||
private MiniDFSCluster gCluster;
|
|
||||||
|
|
||||||
static {
|
|
||||||
// explicitly turn on permission checking
|
|
||||||
CONF.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
|
|
||||||
|
|
||||||
// create fake mapping for the groups
|
|
||||||
Map<String, String[]> u2g_map = new HashMap<String, String[]>(1);
|
|
||||||
u2g_map.put(USER1_NAME, new String[] { GROUP1_NAME, GROUP2_NAME });
|
|
||||||
DFSTestUtil.updateConfWithFakeGroupMapping(CONF, u2g_map);
|
|
||||||
|
|
||||||
// Initiate all four users
|
|
||||||
USER1 = UserGroupInformation.createUserForTesting(USER1_NAME, new String[] {
|
|
||||||
GROUP1_NAME, GROUP2_NAME });
|
|
||||||
}
|
|
||||||
|
|
||||||
@Before
|
|
||||||
public void setUp() throws IOException {
|
|
||||||
gCluster = new MiniDFSCluster.Builder(CONF).numDataNodes(3).build();
|
|
||||||
gCluster.waitActive();
|
|
||||||
}
|
|
||||||
|
|
||||||
@After
|
|
||||||
public void tearDown() throws IOException {
|
|
||||||
if (gCluster != null) {
|
|
||||||
gCluster.shutdown();
|
|
||||||
gCluster = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static int countLease(MiniDFSCluster cluster) {
|
|
||||||
return TestDFSRename.countLease(cluster);
|
|
||||||
}
|
|
||||||
|
|
||||||
void list(DistributedFileSystem dfs, String name) throws IOException {
|
|
||||||
FileSystem.LOG.info("\n\n" + name);
|
|
||||||
for (FileStatus s : dfs.listStatus(asyncRenameDir)) {
|
|
||||||
FileSystem.LOG.info("" + s.getPath());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void createFile(DistributedFileSystem dfs, Path f) throws IOException {
|
|
||||||
DataOutputStream a_out = dfs.create(f);
|
|
||||||
a_out.writeBytes("something");
|
|
||||||
a_out.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Check the blocks of dst file are cleaned after rename with overwrite
|
|
||||||
* Restart NN to check the rename successfully
|
|
||||||
*/
|
|
||||||
@Test
|
|
||||||
public void testAsyncRenameWithOverwrite() throws Exception {
|
|
||||||
final short replFactor = 2;
|
|
||||||
final long blockSize = 512;
|
|
||||||
Configuration conf = new Configuration();
|
|
||||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
|
|
||||||
replFactor).build();
|
|
||||||
cluster.waitActive();
|
|
||||||
DistributedFileSystem dfs = cluster.getFileSystem();
|
|
||||||
AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
|
|
||||||
|
|
||||||
try {
|
|
||||||
|
|
||||||
long fileLen = blockSize * 3;
|
|
||||||
String src = "/foo/src";
|
|
||||||
String dst = "/foo/dst";
|
|
||||||
String src2 = "/foo/src2";
|
|
||||||
String dst2 = "/foo/dst2";
|
|
||||||
Path srcPath = new Path(src);
|
|
||||||
Path dstPath = new Path(dst);
|
|
||||||
Path srcPath2 = new Path(src2);
|
|
||||||
Path dstPath2 = new Path(dst2);
|
|
||||||
|
|
||||||
DFSTestUtil.createFile(dfs, srcPath, fileLen, replFactor, 1);
|
|
||||||
DFSTestUtil.createFile(dfs, dstPath, fileLen, replFactor, 1);
|
|
||||||
DFSTestUtil.createFile(dfs, srcPath2, fileLen, replFactor, 1);
|
|
||||||
DFSTestUtil.createFile(dfs, dstPath2, fileLen, replFactor, 1);
|
|
||||||
|
|
||||||
LocatedBlocks lbs = NameNodeAdapter.getBlockLocations(
|
|
||||||
cluster.getNameNode(), dst, 0, fileLen);
|
|
||||||
LocatedBlocks lbs2 = NameNodeAdapter.getBlockLocations(
|
|
||||||
cluster.getNameNode(), dst2, 0, fileLen);
|
|
||||||
BlockManager bm = NameNodeAdapter.getNamesystem(cluster.getNameNode())
|
|
||||||
.getBlockManager();
|
|
||||||
assertTrue(bm.getStoredBlock(lbs.getLocatedBlocks().get(0).getBlock()
|
|
||||||
.getLocalBlock()) != null);
|
|
||||||
assertTrue(bm.getStoredBlock(lbs2.getLocatedBlocks().get(0).getBlock()
|
|
||||||
.getLocalBlock()) != null);
|
|
||||||
|
|
||||||
Future<Void> retVal1 = adfs.rename(srcPath, dstPath, Rename.OVERWRITE);
|
|
||||||
Future<Void> retVal2 = adfs.rename(srcPath2, dstPath2, Rename.OVERWRITE);
|
|
||||||
retVal1.get();
|
|
||||||
retVal2.get();
|
|
||||||
|
|
||||||
assertTrue(bm.getStoredBlock(lbs.getLocatedBlocks().get(0).getBlock()
|
|
||||||
.getLocalBlock()) == null);
|
|
||||||
assertTrue(bm.getStoredBlock(lbs2.getLocatedBlocks().get(0).getBlock()
|
|
||||||
.getLocalBlock()) == null);
|
|
||||||
|
|
||||||
// Restart NN and check the rename successfully
|
|
||||||
cluster.restartNameNodes();
|
|
||||||
assertFalse(dfs.exists(srcPath));
|
|
||||||
assertTrue(dfs.exists(dstPath));
|
|
||||||
assertFalse(dfs.exists(srcPath2));
|
|
||||||
assertTrue(dfs.exists(dstPath2));
|
|
||||||
} finally {
|
|
||||||
if (dfs != null) {
|
|
||||||
dfs.close();
|
|
||||||
}
|
|
||||||
if (cluster != null) {
|
|
||||||
cluster.shutdown();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testConcurrentAsyncRenameWithOverwrite() throws Exception {
|
|
||||||
final short replFactor = 2;
|
|
||||||
final long blockSize = 512;
|
|
||||||
final Path renameDir = new Path(
|
|
||||||
"/test/concurrent_reanme_with_overwrite_dir/");
|
|
||||||
Configuration conf = new HdfsConfiguration();
|
|
||||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
|
|
||||||
.build();
|
|
||||||
cluster.waitActive();
|
|
||||||
DistributedFileSystem dfs = cluster.getFileSystem();
|
|
||||||
AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
|
|
||||||
int count = 1000;
|
|
||||||
|
|
||||||
try {
|
|
||||||
long fileLen = blockSize * 3;
|
|
||||||
assertTrue(dfs.mkdirs(renameDir));
|
|
||||||
|
|
||||||
Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
|
|
||||||
|
|
||||||
// concurrently invoking many rename
|
|
||||||
for (int i = 0; i < count; i++) {
|
|
||||||
Path src = new Path(renameDir, "src" + i);
|
|
||||||
Path dst = new Path(renameDir, "dst" + i);
|
|
||||||
DFSTestUtil.createFile(dfs, src, fileLen, replFactor, 1);
|
|
||||||
DFSTestUtil.createFile(dfs, dst, fileLen, replFactor, 1);
|
|
||||||
Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
|
|
||||||
returnFutures.put(i, returnFuture);
|
|
||||||
}
|
|
||||||
|
|
||||||
// wait for completing the calls
|
|
||||||
for (int i = 0; i < count; i++) {
|
|
||||||
returnFutures.get(i).get();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Restart NN and check the rename successfully
|
|
||||||
cluster.restartNameNodes();
|
|
||||||
|
|
||||||
// very the src dir should not exist, dst should
|
|
||||||
for (int i = 0; i < count; i++) {
|
|
||||||
Path src = new Path(renameDir, "src" + i);
|
|
||||||
Path dst = new Path(renameDir, "dst" + i);
|
|
||||||
assertFalse(dfs.exists(src));
|
|
||||||
assertTrue(dfs.exists(dst));
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
dfs.delete(renameDir, true);
|
|
||||||
if (cluster != null) {
|
|
||||||
cluster.shutdown();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testAsyncRenameWithException() throws Exception {
|
|
||||||
FileSystem rootFs = FileSystem.get(CONF);
|
|
||||||
final Path renameDir = new Path("/test/async_rename_exception/");
|
|
||||||
final Path src = new Path(renameDir, "src");
|
|
||||||
final Path dst = new Path(renameDir, "dst");
|
|
||||||
rootFs.mkdirs(src);
|
|
||||||
|
|
||||||
AsyncDistributedFileSystem adfs = USER1
|
|
||||||
.doAs(new PrivilegedExceptionAction<AsyncDistributedFileSystem>() {
|
|
||||||
@Override
|
|
||||||
public AsyncDistributedFileSystem run() throws Exception {
|
|
||||||
return gCluster.getFileSystem().getAsyncDistributedFileSystem();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
try {
|
|
||||||
Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
|
|
||||||
returnFuture.get();
|
|
||||||
} catch (ExecutionException e) {
|
|
||||||
checkPermissionDenied(e, src);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void checkPermissionDenied(final Exception e, final Path dir) {
|
|
||||||
assertTrue(e.getCause() instanceof ExecutionException);
|
|
||||||
assertTrue("Permission denied messages must carry AccessControlException",
|
|
||||||
e.getMessage().contains("AccessControlException"));
|
|
||||||
assertTrue("Permission denied messages must carry the username", e
|
|
||||||
.getMessage().contains(USER1_NAME));
|
|
||||||
assertTrue("Permission denied messages must carry the path parent", e
|
|
||||||
.getMessage().contains(dir.getParent().toUri().getPath()));
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue