HBASE-11821 [ImportTSV] Abstract labels tags creation into pluggable Interface.

This commit is contained in:
anoopsjohn 2014-09-02 16:30:37 +05:30
parent d16c9d3776
commit 310ac4f71d
11 changed files with 451 additions and 371 deletions

View File

@ -0,0 +1,103 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.util.ReflectionUtils;
/**
* Facade to create Cells for HFileOutputFormat. The created Cells are of <code>Put</code> type.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class CellCreator {
public static final String VISIBILITY_EXP_RESOLVER_CLASS =
"hbase.mapreduce.visibility.expression.resolver.class";
private VisibilityExpressionResolver visExpResolver;
public CellCreator(Configuration conf) {
Class<? extends VisibilityExpressionResolver> clazz = conf.getClass(
VISIBILITY_EXP_RESOLVER_CLASS, DefaultVisibilityExpressionResolver.class,
VisibilityExpressionResolver.class);
this.visExpResolver = ReflectionUtils.newInstance(clazz, conf);
this.visExpResolver.init();
}
/**
* @param row row key
* @param roffset row offset
* @param rlength row length
* @param family family name
* @param foffset family offset
* @param flength family length
* @param qualifier column qualifier
* @param qoffset qualifier offset
* @param qlength qualifier length
* @param timestamp version timestamp
* @param value column value
* @param voffset value offset
* @param vlength value length
* @return created Cell
* @throws IOException
*/
public Cell create(byte[] row, int roffset, int rlength, byte[] family, int foffset, int flength,
byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset,
int vlength) throws IOException {
return create(row, roffset, rlength, family, foffset, flength, qualifier, qoffset, qlength,
timestamp, value, voffset, vlength, null);
}
/**
* @param row row key
* @param roffset row offset
* @param rlength row length
* @param family family name
* @param foffset family offset
* @param flength family length
* @param qualifier column qualifier
* @param qoffset qualifier offset
* @param qlength qualifier length
* @param timestamp version timestamp
* @param value column value
* @param voffset value offset
* @param vlength value length
* @param visExpression visibility expression to be associated with cell
* @return created Cell
* @throws IOException
*/
public Cell create(byte[] row, int roffset, int rlength, byte[] family, int foffset, int flength,
byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset,
int vlength, String visExpression) throws IOException {
List<Tag> visTags = null;
if (visExpression != null) {
visTags = this.visExpResolver.createVisibilityExpTags(visExpression);
}
return new KeyValue(row, roffset, rlength, family, foffset, flength, qualifier, qoffset,
qlength, timestamp, KeyValue.Type.Put, value, voffset, vlength, visTags);
}
}

View File

@ -0,0 +1,119 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABELS_TABLE_FAMILY;
import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABELS_TABLE_NAME;
import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABEL_QUALIFIER;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.security.visibility.Authorizations;
import org.apache.hadoop.hbase.security.visibility.VisibilityLabelOrdinalProvider;
import org.apache.hadoop.hbase.security.visibility.VisibilityUtils;
import org.apache.hadoop.hbase.util.Bytes;
/**
* This implementation creates tags by expanding expression using label ordinal. Labels will be
* serialized in sorted order of it's ordinal.
*/
@InterfaceAudience.Private
public class DefaultVisibilityExpressionResolver implements VisibilityExpressionResolver {
private static final Log LOG = LogFactory.getLog(DefaultVisibilityExpressionResolver.class);
private Configuration conf;
private final Map<String, Integer> labels = new HashMap<String, Integer>();
@Override
public Configuration getConf() {
return this.conf;
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
@Override
public void init() {
// Reading all the labels and ordinal.
// This scan should be done by user with global_admin previliges.. Ensure that it works
HTable labelsTable = null;
try {
labelsTable = new HTable(conf, LABELS_TABLE_NAME);
} catch (TableNotFoundException e) {
// Just return with out doing any thing. When the VC is not used we wont be having 'labels'
// table in the cluster.
return;
} catch (IOException e) {
LOG.error("Error opening 'labels' table", e);
return;
}
Scan scan = new Scan();
scan.setAuthorizations(new Authorizations(VisibilityUtils.SYSTEM_LABEL));
scan.addColumn(LABELS_TABLE_FAMILY, LABEL_QUALIFIER);
ResultScanner scanner = null;
try {
scanner = labelsTable.getScanner(scan);
Result next = null;
while ((next = scanner.next()) != null) {
byte[] row = next.getRow();
byte[] value = next.getValue(LABELS_TABLE_FAMILY, LABEL_QUALIFIER);
labels.put(Bytes.toString(value), Bytes.toInt(row));
}
} catch (IOException e) {
LOG.error("Error reading 'labels' table", e);
} finally {
try {
if (scanner != null) {
scanner.close();
}
} finally {
try {
labelsTable.close();
} catch (IOException e) {
LOG.warn("Error on closing 'labels' table", e);
}
}
}
}
@Override
public List<Tag> createVisibilityExpTags(String visExpression) throws IOException {
VisibilityLabelOrdinalProvider provider = new VisibilityLabelOrdinalProvider() {
@Override
public int getLabelOrdinal(String label) {
return labels.get(label);
}
};
return VisibilityUtils.createVisibilityExpTags(visExpression, true, false, null, provider);
}
}

View File

@ -1,232 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABELS_TABLE_FAMILY;
import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABELS_TABLE_NAME;
import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABEL_QUALIFIER;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.util.StreamUtils;
import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
import org.apache.hadoop.hbase.security.visibility.Authorizations;
import org.apache.hadoop.hbase.security.visibility.ExpressionExpander;
import org.apache.hadoop.hbase.security.visibility.ExpressionParser;
import org.apache.hadoop.hbase.security.visibility.InvalidLabelException;
import org.apache.hadoop.hbase.security.visibility.ParseException;
import org.apache.hadoop.hbase.security.visibility.VisibilityUtils;
import org.apache.hadoop.hbase.security.visibility.expression.ExpressionNode;
import org.apache.hadoop.hbase.security.visibility.expression.LeafExpressionNode;
import org.apache.hadoop.hbase.security.visibility.expression.NonLeafExpressionNode;
import org.apache.hadoop.hbase.security.visibility.expression.Operator;
import org.apache.hadoop.hbase.util.Bytes;
/**
* An utility class that helps the mapper and reducers used with visibility to
* scan the visibility_labels and helps in parsing and expanding the visibility
* tags
*
*/
@InterfaceAudience.Private
public class LabelExpander {
private Configuration conf;
private ExpressionParser parser = new ExpressionParser();
private ExpressionExpander expander = new ExpressionExpander();
public LabelExpander(Configuration conf) {
this.conf = conf;
}
private Map<String, Integer> labels;
// TODO : The code repeats from that in Visibility Controller.. Refactoring
// may be needed
private List<Tag> createVisibilityTags(String visibilityLabelsExp) throws IOException,
ParseException, InvalidLabelException {
ExpressionNode node = null;
node = parser.parse(visibilityLabelsExp);
node = expander.expand(node);
List<Tag> tags = new ArrayList<Tag>();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
List<Integer> labelOrdinals = new ArrayList<Integer>();
// We will be adding this tag before the visibility tags and the presence of
// this
// tag indicates we are supporting deletes with cell visibility
tags.add(VisibilityUtils.SORTED_ORDINAL_SERIALIZATION_FORMAT_TAG);
if (node.isSingleNode()) {
getLabelOrdinals(node, labelOrdinals);
writeLabelOrdinalsToStream(labelOrdinals, dos);
tags.add(new Tag(TagType.VISIBILITY_TAG_TYPE, baos.toByteArray()));
baos.reset();
} else {
NonLeafExpressionNode nlNode = (NonLeafExpressionNode) node;
if (nlNode.getOperator() == Operator.OR) {
for (ExpressionNode child : nlNode.getChildExps()) {
getLabelOrdinals(child, labelOrdinals);
writeLabelOrdinalsToStream(labelOrdinals, dos);
tags.add(new Tag(TagType.VISIBILITY_TAG_TYPE, baos.toByteArray()));
baos.reset();
labelOrdinals.clear();
}
} else {
getLabelOrdinals(nlNode, labelOrdinals);
writeLabelOrdinalsToStream(labelOrdinals, dos);
tags.add(new Tag(TagType.VISIBILITY_TAG_TYPE, baos.toByteArray()));
baos.reset();
}
}
return tags;
}
private void writeLabelOrdinalsToStream(List<Integer> labelOrdinals, DataOutputStream dos)
throws IOException {
Collections.sort(labelOrdinals);
for (Integer labelOrdinal : labelOrdinals) {
StreamUtils.writeRawVInt32(dos, labelOrdinal);
}
}
private void getLabelOrdinals(ExpressionNode node, List<Integer> labelOrdinals)
throws IOException, InvalidLabelException {
if (node.isSingleNode()) {
String identifier = null;
int labelOrdinal = 0;
if (node instanceof LeafExpressionNode) {
identifier = ((LeafExpressionNode) node).getIdentifier();
labelOrdinal = this.labels.get(identifier);
} else {
// This is a NOT node.
LeafExpressionNode lNode = (LeafExpressionNode) ((NonLeafExpressionNode) node)
.getChildExps().get(0);
identifier = lNode.getIdentifier();
labelOrdinal = this.labels.get(identifier);
labelOrdinal = -1 * labelOrdinal; // Store NOT node as -ve ordinal.
}
if (labelOrdinal == 0) {
throw new InvalidLabelException("Invalid visibility label " + identifier);
}
labelOrdinals.add(labelOrdinal);
} else {
List<ExpressionNode> childExps = ((NonLeafExpressionNode) node).getChildExps();
for (ExpressionNode child : childExps) {
getLabelOrdinals(child, labelOrdinals);
}
}
}
private void createLabels() throws IOException {
// This scan should be done by user with global_admin previliges.. Ensure
// that it works
HTable visibilityLabelsTable = null;
ResultScanner scanner = null;
try {
labels = new HashMap<String, Integer>();
visibilityLabelsTable = new HTable(conf, LABELS_TABLE_NAME.getName());
Scan scan = new Scan();
scan.setAuthorizations(new Authorizations(VisibilityUtils.SYSTEM_LABEL));
scan.addColumn(LABELS_TABLE_FAMILY, LABEL_QUALIFIER);
scanner = visibilityLabelsTable.getScanner(scan);
while (true) {
Result next = scanner.next();
if (next == null) {
break;
}
byte[] row = next.getRow();
byte[] value = next.getValue(LABELS_TABLE_FAMILY, LABEL_QUALIFIER);
labels.put(Bytes.toString(value), Bytes.toInt(row));
}
} finally {
try {
if (scanner != null) {
scanner.close();
}
} finally {
if (visibilityLabelsTable != null) {
visibilityLabelsTable.close();
}
}
}
}
/**
* Creates a kv from the cell visibility expr specified in the ImportTSV and uses it as the
* visibility tag in the kv
* @param rowKeyOffset
* @param rowKeyLength
* @param family
* @param familyOffset
* @param familyLength
* @param qualifier
* @param qualifierOffset
* @param qualifierLength
* @param ts
* @param put
* @param lineBytes
* @param columnOffset
* @param columnLength
* @param cellVisibilityExpr
* @return KeyValue from the cell visibility expr
* @throws IOException
* @throws BadTsvLineException
* @throws ParseException
*/
public KeyValue createKVFromCellVisibilityExpr(int rowKeyOffset, int rowKeyLength, byte[] family,
int familyOffset, int familyLength, byte[] qualifier, int qualifierOffset,
int qualifierLength, long ts, Type put, byte[] lineBytes, int columnOffset, int columnLength,
String cellVisibilityExpr) throws IOException, BadTsvLineException {
if(this.labels == null && cellVisibilityExpr != null) {
createLabels();
}
KeyValue kv = null;
if (cellVisibilityExpr != null) {
// Apply the expansion and parsing here
try {
List<Tag> visibilityTags = createVisibilityTags(cellVisibilityExpr);
kv = new KeyValue(lineBytes, rowKeyOffset, rowKeyLength, family, familyOffset,
familyLength, qualifier, qualifierOffset, qualifierLength, ts, KeyValue.Type.Put,
lineBytes, columnOffset, columnLength, visibilityTags);
} catch (ParseException e) {
throw new BadTsvLineException("Parse Exception " + e.getMessage());
}
} else {
kv = new KeyValue(lineBytes, rowKeyOffset, rowKeyLength, family, familyOffset, familyLength,
qualifier, qualifierOffset, qualifierLength, ts, KeyValue.Type.Put, lineBytes, columnOffset,
columnLength);
}
return kv;
}
}

View File

@ -25,7 +25,9 @@ import java.util.TreeSet;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Base64; import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
@ -34,8 +36,7 @@ import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
/** /**
* Emits Sorted KeyValues. Reads the text passed, parses it and creates the Key Values then Sorts * Emits Sorted KeyValues. Parse the passed text and creates KeyValues. Sorts them before emit.
* them and emits Keyalues in sorted order.
* @see HFileOutputFormat * @see HFileOutputFormat
* @see KeyValueSortReducer * @see KeyValueSortReducer
* @see PutSortReducer * @see PutSortReducer
@ -61,7 +62,7 @@ public class TextSortReducer extends
/** Cell visibility expr **/ /** Cell visibility expr **/
private String cellVisibilityExpr; private String cellVisibilityExpr;
private LabelExpander labelExpander; private CellCreator kvCreator;
public long getTs() { public long getTs() {
return ts; return ts;
@ -97,7 +98,7 @@ public class TextSortReducer extends
if (parser.getRowKeyColumnIndex() == -1) { if (parser.getRowKeyColumnIndex() == -1) {
throw new RuntimeException("No row key column specified"); throw new RuntimeException("No row key column specified");
} }
labelExpander = new LabelExpander(conf); this.kvCreator = new CellCreator(conf);
} }
/** /**
@ -153,20 +154,13 @@ public class TextSortReducer extends
|| i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()) { || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()) {
continue; continue;
} }
KeyValue kv = null; // Creating the KV which needs to be directly written to HFiles. Using the Facade
if (cellVisibilityExpr == null) { // KVCreator for creation of kvs.
kv = new KeyValue(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(), Cell cell = this.kvCreator.create(lineBytes, parsed.getRowKeyOffset(),
parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0, parsed.getRowKeyLength(), parser.getFamily(i), 0, parser.getFamily(i).length,
parser.getQualifier(i).length, ts, KeyValue.Type.Put, lineBytes, parser.getQualifier(i), 0, parser.getQualifier(i).length, ts, lineBytes,
parsed.getColumnOffset(i), parsed.getColumnLength(i)); parsed.getColumnOffset(i), parsed.getColumnLength(i), cellVisibilityExpr);
} else { KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
// Should ensure that VisibilityController is present
kv = labelExpander.createKVFromCellVisibilityExpr(
parsed.getRowKeyOffset(), parsed.getRowKeyLength(), parser.getFamily(i), 0,
parser.getFamily(i).length, parser.getQualifier(i), 0,
parser.getQualifier(i).length, ts, KeyValue.Type.Put, lineBytes,
parsed.getColumnOffset(i), parsed.getColumnLength(i), cellVisibilityExpr);
}
kvs.add(kv); kvs.add(kv);
curSize += kv.heapSize(); curSize += kv.heapSize();
} }

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@ -58,9 +59,9 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
protected String cellVisibilityExpr; protected String cellVisibilityExpr;
private String hfileOutPath; protected CellCreator kvCreator;
private LabelExpander labelExpander; private String hfileOutPath;
public long getTs() { public long getTs() {
return ts; return ts;
@ -96,7 +97,7 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
if (parser.getRowKeyColumnIndex() == -1) { if (parser.getRowKeyColumnIndex() == -1) {
throw new RuntimeException("No row key column specified"); throw new RuntimeException("No row key column specified");
} }
labelExpander = new LabelExpander(conf); this.kvCreator = new CellCreator(conf);
} }
/** /**
@ -150,7 +151,7 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
|| i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()) { || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()) {
continue; continue;
} }
KeyValue kv = createPuts(lineBytes, parsed, put, i); populatePut(lineBytes, parsed, put, i);
} }
context.write(rowKey, put); context.write(rowKey, put);
} catch (ImportTsv.TsvParser.BadTsvLineException badLine) { } catch (ImportTsv.TsvParser.BadTsvLineException badLine) {
@ -178,11 +179,11 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
} }
} }
protected KeyValue createPuts(byte[] lineBytes, ImportTsv.TsvParser.ParsedLine parsed, Put put, protected void populatePut(byte[] lineBytes, ImportTsv.TsvParser.ParsedLine parsed, Put put,
int i) throws BadTsvLineException, IOException { int i) throws BadTsvLineException, IOException {
KeyValue kv = null; Cell cell = null;
if (hfileOutPath == null) { if (hfileOutPath == null) {
kv = new KeyValue(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(), cell = new KeyValue(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0, parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0,
parser.getQualifier(i).length, ts, KeyValue.Type.Put, lineBytes, parser.getQualifier(i).length, ts, KeyValue.Type.Put, lineBytes,
parsed.getColumnOffset(i), parsed.getColumnLength(i)); parsed.getColumnOffset(i), parsed.getColumnLength(i));
@ -192,13 +193,13 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
put.setCellVisibility(new CellVisibility(cellVisibilityExpr)); put.setCellVisibility(new CellVisibility(cellVisibilityExpr));
} }
} else { } else {
kv = labelExpander.createKVFromCellVisibilityExpr( // Creating the KV which needs to be directly written to HFiles. Using the Facade
parsed.getRowKeyOffset(), parsed.getRowKeyLength(), parser.getFamily(i), 0, // KVCreator for creation of kvs.
parser.getFamily(i).length, parser.getQualifier(i), 0, cell = this.kvCreator.create(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
parser.getQualifier(i).length, ts, KeyValue.Type.Put, lineBytes, parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0,
parsed.getColumnOffset(i), parsed.getColumnLength(i), cellVisibilityExpr); parser.getQualifier(i).length, ts, lineBytes, parsed.getColumnOffset(i),
parsed.getColumnLength(i), cellVisibilityExpr);
} }
put.add(kv); put.add(cell);
return kv;
} }
} }

View File

@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.hbase.Tag;
/**
* Interface to convert visibility expressions into Tags for storing along with Cells in HFiles.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface VisibilityExpressionResolver extends Configurable {
/**
* Giving a chance for the initialization.
*/
void init();
/**
* Convert visibility expression into tags to be serialized.
* @param visExpression the label expression
* @return The list of tags corresponds to the visibility expression. These tags will be stored
* along with the Cells.
*/
List<Tag> createVisibilityExpTags(String visExpression) throws IOException;
}

View File

@ -54,13 +54,8 @@ import org.apache.hadoop.hbase.io.util.StreamUtils;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.OperationStatus; import org.apache.hadoop.hbase.regionserver.OperationStatus;
import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.access.AccessControlLists; import org.apache.hadoop.hbase.security.access.AccessControlLists;
import org.apache.hadoop.hbase.security.visibility.expression.ExpressionNode;
import org.apache.hadoop.hbase.security.visibility.expression.LeafExpressionNode;
import org.apache.hadoop.hbase.security.visibility.expression.NonLeafExpressionNode;
import org.apache.hadoop.hbase.security.visibility.expression.Operator;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@ -78,8 +73,6 @@ public class DefaultVisibilityLabelServiceImpl implements VisibilityLabelService
private static final byte[] DUMMY_VALUE = new byte[0]; private static final byte[] DUMMY_VALUE = new byte[0];
private volatile int ordinalCounter = -1; private volatile int ordinalCounter = -1;
private final ExpressionParser expressionParser = new ExpressionParser();
private final ExpressionExpander expressionExpander = new ExpressionExpander();
private Configuration conf; private Configuration conf;
private HRegion labelsRegion; private HRegion labelsRegion;
private VisibilityLabelsCache labelsCache; private VisibilityLabelsCache labelsCache;
@ -375,107 +368,12 @@ public class DefaultVisibilityLabelServiceImpl implements VisibilityLabelService
@Override @Override
public List<Tag> createVisibilityExpTags(String visExpression, boolean withSerializationFormat, public List<Tag> createVisibilityExpTags(String visExpression, boolean withSerializationFormat,
boolean checkAuths) throws IOException { boolean checkAuths) throws IOException {
ExpressionNode node = null;
try {
node = this.expressionParser.parse(visExpression);
} catch (ParseException e) {
throw new IOException(e);
}
node = this.expressionExpander.expand(node);
List<Tag> tags = new ArrayList<Tag>();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
List<Integer> labelOrdinals = new ArrayList<Integer>();
// We will be adding this tag before the visibility tags and the presence of this
// tag indicates we are supporting deletes with cell visibility
if (withSerializationFormat) {
tags.add(VisibilityUtils.SORTED_ORDINAL_SERIALIZATION_FORMAT_TAG);
}
Set<Integer> auths = null; Set<Integer> auths = null;
if (checkAuths) { if (checkAuths) {
auths = this.labelsCache.getAuthsAsOrdinals(VisibilityUtils.getActiveUser().getShortName()); auths = this.labelsCache.getAuthsAsOrdinals(VisibilityUtils.getActiveUser().getShortName());
} }
if (node.isSingleNode()) { return VisibilityUtils.createVisibilityExpTags(visExpression, withSerializationFormat,
getLabelOrdinals(node, labelOrdinals, auths, checkAuths); checkAuths, auths, labelsCache);
writeLabelOrdinalsToStream(labelOrdinals, dos);
tags.add(new Tag(VISIBILITY_TAG_TYPE, baos.toByteArray()));
baos.reset();
} else {
NonLeafExpressionNode nlNode = (NonLeafExpressionNode) node;
if (nlNode.getOperator() == Operator.OR) {
for (ExpressionNode child : nlNode.getChildExps()) {
getLabelOrdinals(child, labelOrdinals, auths, checkAuths);
writeLabelOrdinalsToStream(labelOrdinals, dos);
tags.add(new Tag(VISIBILITY_TAG_TYPE, baos.toByteArray()));
baos.reset();
labelOrdinals.clear();
}
} else {
getLabelOrdinals(nlNode, labelOrdinals, auths, checkAuths);
writeLabelOrdinalsToStream(labelOrdinals, dos);
tags.add(new Tag(VISIBILITY_TAG_TYPE, baos.toByteArray()));
baos.reset();
}
}
return tags;
}
protected void getLabelOrdinals(ExpressionNode node, List<Integer> labelOrdinals,
Set<Integer> auths, boolean checkAuths) throws IOException, InvalidLabelException {
if (node.isSingleNode()) {
String identifier = null;
int labelOrdinal = 0;
if (node instanceof LeafExpressionNode) {
identifier = ((LeafExpressionNode) node).getIdentifier();
if (LOG.isTraceEnabled()) {
LOG.trace("The identifier is " + identifier);
}
labelOrdinal = this.labelsCache.getLabelOrdinal(identifier);
checkAuths(auths, labelOrdinal, identifier, checkAuths);
} else {
// This is a NOT node.
LeafExpressionNode lNode = (LeafExpressionNode) ((NonLeafExpressionNode) node)
.getChildExps().get(0);
identifier = lNode.getIdentifier();
labelOrdinal = this.labelsCache.getLabelOrdinal(identifier);
checkAuths(auths, labelOrdinal, identifier, checkAuths);
labelOrdinal = -1 * labelOrdinal; // Store NOT node as -ve ordinal.
}
if (labelOrdinal == 0) {
throw new InvalidLabelException("Invalid visibility label " + identifier);
}
labelOrdinals.add(labelOrdinal);
} else {
List<ExpressionNode> childExps = ((NonLeafExpressionNode) node).getChildExps();
for (ExpressionNode child : childExps) {
getLabelOrdinals(child, labelOrdinals, auths, checkAuths);
}
}
}
private void checkAuths(Set<Integer> auths, int labelOrdinal, String identifier,
boolean checkAuths) throws IOException {
if (checkAuths) {
if (auths == null || (!auths.contains(labelOrdinal))) {
throw new AccessDeniedException("Visibility label " + identifier
+ " not authorized for the user " + VisibilityUtils.getActiveUser().getShortName());
}
}
}
/**
* This will sort the passed labels in ascending oder and then will write one after the other
* to the passed stream.
* @param labelOrdinals Unsorted label ordinals
* @param dos Stream where to write the labels.
* @throws IOException When IOE during writes to Stream.
*/
protected void writeLabelOrdinalsToStream(List<Integer> labelOrdinals, DataOutputStream dos)
throws IOException {
Collections.sort(labelOrdinals);
for (Integer labelOrdinal : labelOrdinals) {
StreamUtils.writeRawVInt32(dos, labelOrdinal);
}
} }
protected void updateZk(boolean labelAddition) throws IOException { protected void updateZk(boolean labelAddition) throws IOException {

View File

@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.security.visibility;
import org.apache.hadoop.classification.InterfaceAudience;
@InterfaceAudience.Private
public interface VisibilityLabelOrdinalProvider {
/**
* @param label Not null label string
* @return The ordinal for the label. The ordinal starts from 1. Returns 0 when passed a non
* existing label.
*/
public int getLabelOrdinal(String label);
}

View File

@ -45,7 +45,7 @@ import org.apache.zookeeper.KeeperException;
* znode for labels table * znode for labels table
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class VisibilityLabelsCache { public class VisibilityLabelsCache implements VisibilityLabelOrdinalProvider {
private static final Log LOG = LogFactory.getLog(VisibilityLabelsCache.class); private static final Log LOG = LogFactory.getLog(VisibilityLabelsCache.class);
private static final int NON_EXIST_LABEL_ORDINAL = 0; private static final int NON_EXIST_LABEL_ORDINAL = 0;
@ -153,6 +153,7 @@ public class VisibilityLabelsCache {
* @return The ordinal for the label. The ordinal starts from 1. Returns 0 when passed a non * @return The ordinal for the label. The ordinal starts from 1. Returns 0 when passed a non
* existing label. * existing label.
*/ */
@Override
public int getLabelOrdinal(String label) { public int getLabelOrdinal(String label) {
Integer ordinal = null; Integer ordinal = null;
this.lock.readLock().lock(); this.lock.readLock().lock();

View File

@ -19,12 +19,16 @@ package org.apache.hadoop.hbase.security.visibility;
import static org.apache.hadoop.hbase.TagType.VISIBILITY_TAG_TYPE; import static org.apache.hadoop.hbase.TagType.VISIBILITY_TAG_TYPE;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.Map.Entry; import java.util.Map.Entry;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
@ -39,6 +43,7 @@ import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.TagType; import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.util.StreamUtils;
import org.apache.hadoop.hbase.ipc.RequestContext; import org.apache.hadoop.hbase.ipc.RequestContext;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.MultiUserAuthorizations; import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.MultiUserAuthorizations;
@ -46,7 +51,12 @@ import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.UserAut
import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.VisibilityLabel; import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.VisibilityLabel;
import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.VisibilityLabelsRequest; import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.VisibilityLabelsRequest;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.visibility.expression.ExpressionNode;
import org.apache.hadoop.hbase.security.visibility.expression.LeafExpressionNode;
import org.apache.hadoop.hbase.security.visibility.expression.NonLeafExpressionNode;
import org.apache.hadoop.hbase.security.visibility.expression.Operator;
import org.apache.hadoop.hbase.util.ByteRange; import org.apache.hadoop.hbase.util.ByteRange;
import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -71,6 +81,9 @@ public class VisibilityUtils {
VisibilityConstants.SORTED_ORDINAL_SERIALIZATION_FORMAT_TAG_VAL); VisibilityConstants.SORTED_ORDINAL_SERIALIZATION_FORMAT_TAG_VAL);
private static final String COMMA = ","; private static final String COMMA = ",";
private static final ExpressionParser EXP_PARSER = new ExpressionParser();
private static final ExpressionExpander EXP_EXPANDER = new ExpressionExpander();
/** /**
* Creates the labels data to be written to zookeeper. * Creates the labels data to be written to zookeeper.
* @param existingLabels * @param existingLabels
@ -248,4 +261,110 @@ public class VisibilityUtils {
} }
return user; return user;
} }
public static List<Tag> createVisibilityExpTags(String visExpression,
boolean withSerializationFormat, boolean checkAuths, Set<Integer> auths,
VisibilityLabelOrdinalProvider ordinalProvider) throws IOException {
ExpressionNode node = null;
try {
node = EXP_PARSER.parse(visExpression);
} catch (ParseException e) {
throw new IOException(e);
}
node = EXP_EXPANDER.expand(node);
List<Tag> tags = new ArrayList<Tag>();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
List<Integer> labelOrdinals = new ArrayList<Integer>();
// We will be adding this tag before the visibility tags and the presence of this
// tag indicates we are supporting deletes with cell visibility
if (withSerializationFormat) {
tags.add(VisibilityUtils.SORTED_ORDINAL_SERIALIZATION_FORMAT_TAG);
}
if (node.isSingleNode()) {
getLabelOrdinals(node, labelOrdinals, auths, checkAuths, ordinalProvider);
writeLabelOrdinalsToStream(labelOrdinals, dos);
tags.add(new Tag(VISIBILITY_TAG_TYPE, baos.toByteArray()));
baos.reset();
} else {
NonLeafExpressionNode nlNode = (NonLeafExpressionNode) node;
if (nlNode.getOperator() == Operator.OR) {
for (ExpressionNode child : nlNode.getChildExps()) {
getLabelOrdinals(child, labelOrdinals, auths, checkAuths, ordinalProvider);
writeLabelOrdinalsToStream(labelOrdinals, dos);
tags.add(new Tag(VISIBILITY_TAG_TYPE, baos.toByteArray()));
baos.reset();
labelOrdinals.clear();
}
} else {
getLabelOrdinals(nlNode, labelOrdinals, auths, checkAuths, ordinalProvider);
writeLabelOrdinalsToStream(labelOrdinals, dos);
tags.add(new Tag(VISIBILITY_TAG_TYPE, baos.toByteArray()));
baos.reset();
}
}
return tags;
}
private static void getLabelOrdinals(ExpressionNode node, List<Integer> labelOrdinals,
Set<Integer> auths, boolean checkAuths, VisibilityLabelOrdinalProvider ordinalProvider)
throws IOException, InvalidLabelException {
if (node.isSingleNode()) {
String identifier = null;
int labelOrdinal = 0;
if (node instanceof LeafExpressionNode) {
identifier = ((LeafExpressionNode) node).getIdentifier();
if (LOG.isTraceEnabled()) {
LOG.trace("The identifier is " + identifier);
}
labelOrdinal = ordinalProvider.getLabelOrdinal(identifier);
checkAuths(auths, labelOrdinal, identifier, checkAuths);
} else {
// This is a NOT node.
LeafExpressionNode lNode = (LeafExpressionNode) ((NonLeafExpressionNode) node)
.getChildExps().get(0);
identifier = lNode.getIdentifier();
labelOrdinal = ordinalProvider.getLabelOrdinal(identifier);
checkAuths(auths, labelOrdinal, identifier, checkAuths);
labelOrdinal = -1 * labelOrdinal; // Store NOT node as -ve ordinal.
}
if (labelOrdinal == 0) {
throw new InvalidLabelException("Invalid visibility label " + identifier);
}
labelOrdinals.add(labelOrdinal);
} else {
List<ExpressionNode> childExps = ((NonLeafExpressionNode) node).getChildExps();
for (ExpressionNode child : childExps) {
getLabelOrdinals(child, labelOrdinals, auths, checkAuths, ordinalProvider);
}
}
}
/**
* This will sort the passed labels in ascending oder and then will write one after the other to
* the passed stream.
* @param labelOrdinals
* Unsorted label ordinals
* @param dos
* Stream where to write the labels.
* @throws IOException
* When IOE during writes to Stream.
*/
private static void writeLabelOrdinalsToStream(List<Integer> labelOrdinals, DataOutputStream dos)
throws IOException {
Collections.sort(labelOrdinals);
for (Integer labelOrdinal : labelOrdinals) {
StreamUtils.writeRawVInt32(dos, labelOrdinal);
}
}
private static void checkAuths(Set<Integer> auths, int labelOrdinal, String identifier,
boolean checkAuths) throws IOException {
if (checkAuths) {
if (auths == null || (!auths.contains(labelOrdinal))) {
throw new AccessDeniedException("Visibility label " + identifier
+ " not authorized for the user " + VisibilityUtils.getActiveUser().getShortName());
}
}
}
} }

View File

@ -32,7 +32,7 @@ import org.apache.hadoop.hbase.util.Bytes;
*/ */
public class TsvImporterCustomTestMapperForOprAttr extends TsvImporterMapper { public class TsvImporterCustomTestMapperForOprAttr extends TsvImporterMapper {
@Override @Override
protected KeyValue createPuts(byte[] lineBytes, ParsedLine parsed, Put put, int i) protected void populatePut(byte[] lineBytes, ParsedLine parsed, Put put, int i)
throws BadTsvLineException, IOException { throws BadTsvLineException, IOException {
KeyValue kv; KeyValue kv;
kv = new KeyValue(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(), kv = new KeyValue(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
@ -54,6 +54,5 @@ public class TsvImporterCustomTestMapperForOprAttr extends TsvImporterMapper {
} }
} }
put.add(kv); put.add(kv);
return kv;
} }
} }