HBASE-10412 Distributed log replay: Cell tags getting missed (Anoop Sam John)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1561230 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d3cb5d2f3e
commit
d967845f5b
|
@ -56,6 +56,7 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.CellScanner;
|
import org.apache.hadoop.hbase.CellScanner;
|
||||||
import org.apache.hadoop.hbase.CellUtil;
|
import org.apache.hadoop.hbase.CellUtil;
|
||||||
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.HRegionLocation;
|
import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
|
@ -67,7 +68,6 @@ import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||||
import org.apache.hadoop.hbase.Tag;
|
import org.apache.hadoop.hbase.Tag;
|
||||||
import org.apache.hadoop.hbase.TagType;
|
import org.apache.hadoop.hbase.TagType;
|
||||||
import org.apache.hadoop.hbase.KeyValue.Type;
|
|
||||||
import org.apache.hadoop.hbase.client.ConnectionUtils;
|
import org.apache.hadoop.hbase.client.ConnectionUtils;
|
||||||
import org.apache.hadoop.hbase.client.Delete;
|
import org.apache.hadoop.hbase.client.Delete;
|
||||||
import org.apache.hadoop.hbase.client.HConnection;
|
import org.apache.hadoop.hbase.client.HConnection;
|
||||||
|
@ -76,7 +76,6 @@ import org.apache.hadoop.hbase.client.Mutation;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
|
import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
|
||||||
import org.apache.hadoop.hbase.io.HeapSize;
|
import org.apache.hadoop.hbase.io.HeapSize;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
|
||||||
import org.apache.hadoop.hbase.master.SplitLogManager;
|
import org.apache.hadoop.hbase.master.SplitLogManager;
|
||||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||||
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
|
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
|
||||||
|
@ -169,22 +168,25 @@ public class HLogSplitter {
|
||||||
|
|
||||||
HLogSplitter(Configuration conf, Path rootDir,
|
HLogSplitter(Configuration conf, Path rootDir,
|
||||||
FileSystem fs, LastSequenceId idChecker, ZooKeeperWatcher zkw) {
|
FileSystem fs, LastSequenceId idChecker, ZooKeeperWatcher zkw) {
|
||||||
this.conf = conf;
|
this.conf = HBaseConfiguration.create(conf);
|
||||||
|
String codecClassName = conf
|
||||||
|
.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
|
||||||
|
this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
|
||||||
this.rootDir = rootDir;
|
this.rootDir = rootDir;
|
||||||
this.fs = fs;
|
this.fs = fs;
|
||||||
this.sequenceIdChecker = idChecker;
|
this.sequenceIdChecker = idChecker;
|
||||||
this.watcher = zkw;
|
this.watcher = zkw;
|
||||||
|
|
||||||
entryBuffers = new EntryBuffers(
|
entryBuffers = new EntryBuffers(
|
||||||
conf.getInt("hbase.regionserver.hlog.splitlog.buffersize",
|
this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize",
|
||||||
128*1024*1024));
|
128*1024*1024));
|
||||||
|
|
||||||
// a larger minBatchSize may slow down recovery because replay writer has to wait for
|
// a larger minBatchSize may slow down recovery because replay writer has to wait for
|
||||||
// enough edits before replaying them
|
// enough edits before replaying them
|
||||||
this.minBatchSize = conf.getInt("hbase.regionserver.wal.logreplay.batch.size", 64);
|
this.minBatchSize = this.conf.getInt("hbase.regionserver.wal.logreplay.batch.size", 64);
|
||||||
this.distributedLogReplay = HLogSplitter.isDistributedLogReplay(conf);
|
this.distributedLogReplay = HLogSplitter.isDistributedLogReplay(this.conf);
|
||||||
|
|
||||||
this.numWriterThreads = conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
|
this.numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
|
||||||
if (zkw != null && this.distributedLogReplay) {
|
if (zkw != null && this.distributedLogReplay) {
|
||||||
outputSink = new LogReplayOutputSink(numWriterThreads);
|
outputSink = new LogReplayOutputSink(numWriterThreads);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -22,6 +22,7 @@ import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
@ -41,7 +42,11 @@ import com.google.protobuf.ByteString;
|
||||||
/**
|
/**
|
||||||
* Compression in this class is lifted off Compressor/KeyValueCompression.
|
* Compression in this class is lifted off Compressor/KeyValueCompression.
|
||||||
* This is a pure coincidence... they are independent and don't have to be compatible.
|
* This is a pure coincidence... they are independent and don't have to be compatible.
|
||||||
|
*
|
||||||
|
* This codec is used at server side for writing cells to WAL as well as for sending edits
|
||||||
|
* as part of the distributed splitting process.
|
||||||
*/
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
public class WALCellCodec implements Codec {
|
public class WALCellCodec implements Codec {
|
||||||
/** Configuration key for the class to use when encoding cells in the WAL */
|
/** Configuration key for the class to use when encoding cells in the WAL */
|
||||||
public static final String WAL_CELL_CODEC_CLASS_KEY = "hbase.regionserver.wal.codec";
|
public static final String WAL_CELL_CODEC_CLASS_KEY = "hbase.regionserver.wal.codec";
|
||||||
|
@ -54,6 +59,13 @@ public class WALCellCodec implements Codec {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <b>All subclasses must implement a no argument constructor</b>
|
||||||
|
*/
|
||||||
|
public WALCellCodec() {
|
||||||
|
this.compression = null;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Default constructor - <b>all subclasses must implement a constructor with this signature </b>
|
* Default constructor - <b>all subclasses must implement a constructor with this signature </b>
|
||||||
* if they are to be dynamically loaded from the {@link Configuration}.
|
* if they are to be dynamically loaded from the {@link Configuration}.
|
||||||
|
|
|
@ -76,7 +76,7 @@ public class TestVisibilityLabels {
|
||||||
private static final String PRIVATE = "private";
|
private static final String PRIVATE = "private";
|
||||||
private static final String CONFIDENTIAL = "confidential";
|
private static final String CONFIDENTIAL = "confidential";
|
||||||
private static final String SECRET = "secret";
|
private static final String SECRET = "secret";
|
||||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
public static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||||
private static final byte[] row1 = Bytes.toBytes("row1");
|
private static final byte[] row1 = Bytes.toBytes("row1");
|
||||||
private static final byte[] row2 = Bytes.toBytes("row2");
|
private static final byte[] row2 = Bytes.toBytes("row2");
|
||||||
private static final byte[] row3 = Bytes.toBytes("row3");
|
private static final byte[] row3 = Bytes.toBytes("row3");
|
||||||
|
@ -84,23 +84,23 @@ public class TestVisibilityLabels {
|
||||||
private final static byte[] fam = Bytes.toBytes("info");
|
private final static byte[] fam = Bytes.toBytes("info");
|
||||||
private final static byte[] qual = Bytes.toBytes("qual");
|
private final static byte[] qual = Bytes.toBytes("qual");
|
||||||
private final static byte[] value = Bytes.toBytes("value");
|
private final static byte[] value = Bytes.toBytes("value");
|
||||||
private static Configuration conf;
|
public static Configuration conf;
|
||||||
|
|
||||||
private volatile boolean killedRS = false;
|
private volatile boolean killedRS = false;
|
||||||
@Rule
|
@Rule
|
||||||
public final TestName TEST_NAME = new TestName();
|
public final TestName TEST_NAME = new TestName();
|
||||||
private static User SUPERUSER;
|
public static User SUPERUSER;
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setupBeforeClass() throws Exception {
|
public static void setupBeforeClass() throws Exception {
|
||||||
// setup configuration
|
// setup configuration
|
||||||
conf = TEST_UTIL.getConfiguration();
|
conf = TEST_UTIL.getConfiguration();
|
||||||
|
conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
|
||||||
conf.setInt("hfile.format.version", 3);
|
conf.setInt("hfile.format.version", 3);
|
||||||
conf.set("hbase.coprocessor.master.classes", VisibilityController.class.getName());
|
conf.set("hbase.coprocessor.master.classes", VisibilityController.class.getName());
|
||||||
conf.set("hbase.coprocessor.region.classes", VisibilityController.class.getName());
|
conf.set("hbase.coprocessor.region.classes", VisibilityController.class.getName());
|
||||||
conf.setClass(VisibilityUtils.VISIBILITY_LABEL_GENERATOR_CLASS, SimpleScanLabelGenerator.class,
|
conf.setClass(VisibilityUtils.VISIBILITY_LABEL_GENERATOR_CLASS, SimpleScanLabelGenerator.class,
|
||||||
ScanLabelGenerator.class);
|
ScanLabelGenerator.class);
|
||||||
String currentUser = User.getCurrent().getName();
|
|
||||||
conf.set("hbase.superuser", "admin");
|
conf.set("hbase.superuser", "admin");
|
||||||
TEST_UTIL.startMiniCluster(2);
|
TEST_UTIL.startMiniCluster(2);
|
||||||
SUPERUSER = User.createUserForTesting(conf, "admin", new String[] { "supergroup" });
|
SUPERUSER = User.createUserForTesting(conf, "admin", new String[] { "supergroup" });
|
||||||
|
@ -334,6 +334,8 @@ public class TestVisibilityLabels {
|
||||||
@Test(timeout = 60 * 1000)
|
@Test(timeout = 60 * 1000)
|
||||||
public void testVisibilityLabelsOnRSRestart() throws Exception {
|
public void testVisibilityLabelsOnRSRestart() throws Exception {
|
||||||
final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
|
final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
|
||||||
|
HTable table = createTableAndWriteDataWithLabels(tableName, "(" + SECRET + "|" + CONFIDENTIAL
|
||||||
|
+ ")", PRIVATE);
|
||||||
List<RegionServerThread> regionServerThreads = TEST_UTIL.getHBaseCluster()
|
List<RegionServerThread> regionServerThreads = TEST_UTIL.getHBaseCluster()
|
||||||
.getRegionServerThreads();
|
.getRegionServerThreads();
|
||||||
for (RegionServerThread rsThread : regionServerThreads) {
|
for (RegionServerThread rsThread : regionServerThreads) {
|
||||||
|
@ -342,8 +344,6 @@ public class TestVisibilityLabels {
|
||||||
// Start one new RS
|
// Start one new RS
|
||||||
RegionServerThread rs = TEST_UTIL.getHBaseCluster().startRegionServer();
|
RegionServerThread rs = TEST_UTIL.getHBaseCluster().startRegionServer();
|
||||||
waitForLabelsRegionAvailability(rs.getRegionServer());
|
waitForLabelsRegionAvailability(rs.getRegionServer());
|
||||||
HTable table = createTableAndWriteDataWithLabels(tableName, "(" + SECRET + "|" + CONFIDENTIAL
|
|
||||||
+ ")", PRIVATE);
|
|
||||||
try {
|
try {
|
||||||
Scan s = new Scan();
|
Scan s = new Scan();
|
||||||
s.setAuthorizations(new Authorizations(SECRET));
|
s.setAuthorizations(new Authorizations(SECRET));
|
||||||
|
@ -486,7 +486,9 @@ public class TestVisibilityLabels {
|
||||||
HTable ht = null;
|
HTable ht = null;
|
||||||
try {
|
try {
|
||||||
ht = new HTable(conf, LABELS_TABLE_NAME);
|
ht = new HTable(conf, LABELS_TABLE_NAME);
|
||||||
ResultScanner scanner = ht.getScanner(new Scan());
|
Scan scan = new Scan();
|
||||||
|
scan.setAuthorizations(new Authorizations(VisibilityUtils.SYSTEM_LABEL));
|
||||||
|
ResultScanner scanner = ht.getScanner(scan);
|
||||||
Result result = null;
|
Result result = null;
|
||||||
while ((result = scanner.next()) != null) {
|
while ((result = scanner.next()) != null) {
|
||||||
Cell label = result.getColumnLatestCell(LABELS_TABLE_FAMILY, LABEL_QUALIFIER);
|
Cell label = result.getColumnLatestCell(LABELS_TABLE_FAMILY, LABEL_QUALIFIER);
|
||||||
|
@ -739,7 +741,7 @@ public class TestVisibilityLabels {
|
||||||
return table;
|
return table;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void addLabels() throws Exception {
|
public static void addLabels() throws Exception {
|
||||||
PrivilegedExceptionAction<VisibilityLabelsResponse> action =
|
PrivilegedExceptionAction<VisibilityLabelsResponse> action =
|
||||||
new PrivilegedExceptionAction<VisibilityLabelsResponse>() {
|
new PrivilegedExceptionAction<VisibilityLabelsResponse>() {
|
||||||
public VisibilityLabelsResponse run() throws Exception {
|
public VisibilityLabelsResponse run() throws Exception {
|
||||||
|
|
|
@ -0,0 +1,53 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.security.visibility;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABELS_TABLE_NAME;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.security.User;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test class that tests the visibility labels with distributed log replay feature ON.
|
||||||
|
*/
|
||||||
|
@Category(MediumTests.class)
|
||||||
|
public class TestVisibilityLabelsWithDistributedLogReplay extends TestVisibilityLabels {
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setupBeforeClass() throws Exception {
|
||||||
|
// setup configuration
|
||||||
|
conf = TEST_UTIL.getConfiguration();
|
||||||
|
conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
|
||||||
|
conf.setInt("hfile.format.version", 3);
|
||||||
|
conf.set("hbase.coprocessor.master.classes", VisibilityController.class.getName());
|
||||||
|
conf.set("hbase.coprocessor.region.classes", VisibilityController.class.getName());
|
||||||
|
conf.setClass(VisibilityUtils.VISIBILITY_LABEL_GENERATOR_CLASS, SimpleScanLabelGenerator.class,
|
||||||
|
ScanLabelGenerator.class);
|
||||||
|
conf.set("hbase.superuser", "admin");
|
||||||
|
TEST_UTIL.startMiniCluster(2);
|
||||||
|
SUPERUSER = User.createUserForTesting(conf, "admin", new String[] { "supergroup" });
|
||||||
|
|
||||||
|
// Wait for the labels table to become available
|
||||||
|
TEST_UTIL.waitTableEnabled(LABELS_TABLE_NAME.getName(), 50000);
|
||||||
|
addLabels();
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue