HBASE-12533: staging directories are not deleted after secure bulk load

This commit is contained in:
Jeffrey Zhong 2014-11-26 16:23:11 -08:00
parent def45c2185
commit 643de2df04
3 changed files with 48 additions and 48 deletions

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
@ -55,33 +56,29 @@ public class SecureBulkLoadClient {
public String prepareBulkLoad(final TableName tableName) throws IOException {
try {
return
table.coprocessorService(SecureBulkLoadProtos.SecureBulkLoadService.class,
EMPTY_START_ROW,
LAST_ROW,
new Batch.Call<SecureBulkLoadProtos.SecureBulkLoadService,String>() {
@Override
public String call(SecureBulkLoadProtos.SecureBulkLoadService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
CoprocessorRpcChannel channel = table.coprocessorService(HConstants.EMPTY_START_ROW);
SecureBulkLoadProtos.SecureBulkLoadService instance =
ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel);
BlockingRpcCallback<SecureBulkLoadProtos.PrepareBulkLoadResponse> rpcCallback =
new BlockingRpcCallback<SecureBulkLoadProtos.PrepareBulkLoadResponse>();
ServerRpcController controller = new ServerRpcController();
SecureBulkLoadProtos.PrepareBulkLoadRequest request =
SecureBulkLoadProtos.PrepareBulkLoadRequest.newBuilder()
.setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
BlockingRpcCallback<SecureBulkLoadProtos.PrepareBulkLoadResponse> rpcCallback =
new BlockingRpcCallback<SecureBulkLoadProtos.PrepareBulkLoadResponse>();
instance.prepareBulkLoad(controller,
request,
rpcCallback);
SecureBulkLoadProtos.PrepareBulkLoadRequest request =
SecureBulkLoadProtos.PrepareBulkLoadRequest.newBuilder()
.setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
SecureBulkLoadProtos.PrepareBulkLoadResponse response = rpcCallback.get();
if (controller.failedOnException()) {
throw controller.getFailedOn();
}
return response.getBulkToken();
}
}).entrySet().iterator().next().getValue();
instance.prepareBulkLoad(controller,
request,
rpcCallback);
SecureBulkLoadProtos.PrepareBulkLoadResponse response = rpcCallback.get();
if (controller.failedOnException()) {
throw controller.getFailedOn();
}
return response.getBulkToken();
} catch (Throwable throwable) {
throw new IOException(throwable);
}
@ -89,32 +86,26 @@ public class SecureBulkLoadClient {
public void cleanupBulkLoad(final String bulkToken) throws IOException {
try {
table.coprocessorService(SecureBulkLoadProtos.SecureBulkLoadService.class,
EMPTY_START_ROW,
LAST_ROW,
new Batch.Call<SecureBulkLoadProtos.SecureBulkLoadService, String>() {
CoprocessorRpcChannel channel = table.coprocessorService(HConstants.EMPTY_START_ROW);
SecureBulkLoadProtos.SecureBulkLoadService instance =
ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel);
@Override
public String call(SecureBulkLoadProtos.SecureBulkLoadService instance) throws IOException {
ServerRpcController controller = new ServerRpcController();
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<SecureBulkLoadProtos.CleanupBulkLoadResponse> rpcCallback =
new BlockingRpcCallback<SecureBulkLoadProtos.CleanupBulkLoadResponse>();
BlockingRpcCallback<SecureBulkLoadProtos.CleanupBulkLoadResponse> rpcCallback =
new BlockingRpcCallback<SecureBulkLoadProtos.CleanupBulkLoadResponse>();
SecureBulkLoadProtos.CleanupBulkLoadRequest request =
SecureBulkLoadProtos.CleanupBulkLoadRequest.newBuilder()
.setBulkToken(bulkToken).build();
SecureBulkLoadProtos.CleanupBulkLoadRequest request =
SecureBulkLoadProtos.CleanupBulkLoadRequest.newBuilder()
.setBulkToken(bulkToken).build();
instance.cleanupBulkLoad(controller,
request,
rpcCallback);
instance.cleanupBulkLoad(controller,
request,
rpcCallback);
if (controller.failedOnException()) {
throw controller.getFailedOn();
}
return null;
}
});
if (controller.failedOnException()) {
throw controller.getFailedOn();
}
} catch (Throwable throwable) {
throw new IOException(throwable);
}

View File

@ -198,10 +198,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
}
}
fs.delete(createStagingDir(baseStagingDir,
getActiveUser(),
new Path(request.getBulkToken()).getName()),
true);
fs.delete(new Path(request.getBulkToken()), true);
done.run(CleanupBulkLoadResponse.newBuilder().build());
} catch (IOException e) {
ResponseConverter.setControllerException(controller, e);

View File

@ -27,6 +27,7 @@ import java.io.IOException;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@ -49,6 +50,7 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hadoop.hbase.security.SecureBulkLoadUtil;
/**
* Test cases for the "load" half of the HFileOutputFormat bulk load
@ -259,6 +261,16 @@ public class TestLoadIncrementalHFiles {
table.close();
}
// verify staging folder has been cleaned up
Path stagingBasePath = SecureBulkLoadUtil.getBaseStagingDir(util.getConfiguration());
if(fs.exists(stagingBasePath)) {
FileStatus[] files = fs.listStatus(stagingBasePath);
for(FileStatus file : files) {
assertTrue("Folder=" + file.getPath() + " is not cleaned up.",
file.getPath().getName() != "DONOTERASE");
}
}
util.deleteTable(tableName);
}