HBASE-12533: staging directories are not deleted after secure bulk load
This commit is contained in:
parent
b2cdeacc8c
commit
f0d95e7f11
|
@ -26,6 +26,7 @@ import java.util.ArrayList;
|
|||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
|
@ -54,33 +55,29 @@ public class SecureBulkLoadClient {
|
|||
|
||||
public String prepareBulkLoad(final TableName tableName) throws IOException {
|
||||
try {
|
||||
return
|
||||
table.coprocessorService(SecureBulkLoadProtos.SecureBulkLoadService.class,
|
||||
EMPTY_START_ROW,
|
||||
LAST_ROW,
|
||||
new Batch.Call<SecureBulkLoadProtos.SecureBulkLoadService,String>() {
|
||||
@Override
|
||||
public String call(SecureBulkLoadProtos.SecureBulkLoadService instance) throws IOException {
|
||||
ServerRpcController controller = new ServerRpcController();
|
||||
CoprocessorRpcChannel channel = table.coprocessorService(HConstants.EMPTY_START_ROW);
|
||||
SecureBulkLoadProtos.SecureBulkLoadService instance =
|
||||
ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel);
|
||||
|
||||
BlockingRpcCallback<SecureBulkLoadProtos.PrepareBulkLoadResponse> rpcCallback =
|
||||
new BlockingRpcCallback<SecureBulkLoadProtos.PrepareBulkLoadResponse>();
|
||||
ServerRpcController controller = new ServerRpcController();
|
||||
|
||||
SecureBulkLoadProtos.PrepareBulkLoadRequest request =
|
||||
SecureBulkLoadProtos.PrepareBulkLoadRequest.newBuilder()
|
||||
.setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
|
||||
BlockingRpcCallback<SecureBulkLoadProtos.PrepareBulkLoadResponse> rpcCallback =
|
||||
new BlockingRpcCallback<SecureBulkLoadProtos.PrepareBulkLoadResponse>();
|
||||
|
||||
instance.prepareBulkLoad(controller,
|
||||
request,
|
||||
rpcCallback);
|
||||
SecureBulkLoadProtos.PrepareBulkLoadRequest request =
|
||||
SecureBulkLoadProtos.PrepareBulkLoadRequest.newBuilder()
|
||||
.setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
|
||||
|
||||
SecureBulkLoadProtos.PrepareBulkLoadResponse response = rpcCallback.get();
|
||||
if (controller.failedOnException()) {
|
||||
throw controller.getFailedOn();
|
||||
}
|
||||
return response.getBulkToken();
|
||||
}
|
||||
}).entrySet().iterator().next().getValue();
|
||||
instance.prepareBulkLoad(controller,
|
||||
request,
|
||||
rpcCallback);
|
||||
|
||||
SecureBulkLoadProtos.PrepareBulkLoadResponse response = rpcCallback.get();
|
||||
if (controller.failedOnException()) {
|
||||
throw controller.getFailedOn();
|
||||
}
|
||||
|
||||
return response.getBulkToken();
|
||||
} catch (Throwable throwable) {
|
||||
throw new IOException(throwable);
|
||||
}
|
||||
|
@ -88,32 +85,26 @@ public class SecureBulkLoadClient {
|
|||
|
||||
public void cleanupBulkLoad(final String bulkToken) throws IOException {
|
||||
try {
|
||||
table.coprocessorService(SecureBulkLoadProtos.SecureBulkLoadService.class,
|
||||
EMPTY_START_ROW,
|
||||
LAST_ROW,
|
||||
new Batch.Call<SecureBulkLoadProtos.SecureBulkLoadService, String>() {
|
||||
CoprocessorRpcChannel channel = table.coprocessorService(HConstants.EMPTY_START_ROW);
|
||||
SecureBulkLoadProtos.SecureBulkLoadService instance =
|
||||
ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel);
|
||||
|
||||
@Override
|
||||
public String call(SecureBulkLoadProtos.SecureBulkLoadService instance) throws IOException {
|
||||
ServerRpcController controller = new ServerRpcController();
|
||||
ServerRpcController controller = new ServerRpcController();
|
||||
|
||||
BlockingRpcCallback<SecureBulkLoadProtos.CleanupBulkLoadResponse> rpcCallback =
|
||||
new BlockingRpcCallback<SecureBulkLoadProtos.CleanupBulkLoadResponse>();
|
||||
BlockingRpcCallback<SecureBulkLoadProtos.CleanupBulkLoadResponse> rpcCallback =
|
||||
new BlockingRpcCallback<SecureBulkLoadProtos.CleanupBulkLoadResponse>();
|
||||
|
||||
SecureBulkLoadProtos.CleanupBulkLoadRequest request =
|
||||
SecureBulkLoadProtos.CleanupBulkLoadRequest.newBuilder()
|
||||
.setBulkToken(bulkToken).build();
|
||||
SecureBulkLoadProtos.CleanupBulkLoadRequest request =
|
||||
SecureBulkLoadProtos.CleanupBulkLoadRequest.newBuilder()
|
||||
.setBulkToken(bulkToken).build();
|
||||
|
||||
instance.cleanupBulkLoad(controller,
|
||||
request,
|
||||
rpcCallback);
|
||||
instance.cleanupBulkLoad(controller,
|
||||
request,
|
||||
rpcCallback);
|
||||
|
||||
if (controller.failedOnException()) {
|
||||
throw controller.getFailedOn();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
});
|
||||
if (controller.failedOnException()) {
|
||||
throw controller.getFailedOn();
|
||||
}
|
||||
} catch (Throwable throwable) {
|
||||
throw new IOException(throwable);
|
||||
}
|
||||
|
|
|
@ -199,10 +199,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
|
|||
}
|
||||
}
|
||||
|
||||
fs.delete(createStagingDir(baseStagingDir,
|
||||
getActiveUser(),
|
||||
new Path(request.getBulkToken()).getName()),
|
||||
true);
|
||||
fs.delete(new Path(request.getBulkToken()), true);
|
||||
done.run(CleanupBulkLoadResponse.newBuilder().build());
|
||||
} catch (IOException e) {
|
||||
ResponseConverter.setControllerException(controller, e);
|
||||
|
|
|
@ -27,6 +27,7 @@ import java.io.IOException;
|
|||
import java.util.TreeMap;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
|
@ -50,6 +51,7 @@ import org.junit.AfterClass;
|
|||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.apache.hadoop.hbase.security.SecureBulkLoadUtil;
|
||||
|
||||
/**
|
||||
* Test cases for the "load" half of the HFileOutputFormat bulk load
|
||||
|
@ -260,6 +262,16 @@ public class TestLoadIncrementalHFiles {
|
|||
table.close();
|
||||
}
|
||||
|
||||
// verify staging folder has been cleaned up
|
||||
Path stagingBasePath = SecureBulkLoadUtil.getBaseStagingDir(util.getConfiguration());
|
||||
if(fs.exists(stagingBasePath)) {
|
||||
FileStatus[] files = fs.listStatus(stagingBasePath);
|
||||
for(FileStatus file : files) {
|
||||
assertTrue("Folder=" + file.getPath() + " is not cleaned up.",
|
||||
file.getPath().getName() != "DONOTERASE");
|
||||
}
|
||||
}
|
||||
|
||||
util.deleteTable(tableName);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue