NIFI-817 Processors for interacting with HBase

- Refactoring PutHBaseCell to batch Puts by table
- Adding optional Columns property to GetHBase to return only selected column families or columns
- Making GetHBase cluster friendly by storing state in the distributed cache and a local file
- Adding Initial Time Range property to GetHBase
- Adding Filter Expression property and custom validate to prevent using columns and a filter at the same time
- Creating an HBaseClientService controller service to isolate the HBase client and support multiple versions
- Creating appropriate LICENSE/NOTICE files
- Adding @InputRequirement to processors
- Addressing comments from review, moving hbase client services under standard services
- Making sure result of session.penalize() is assinged to FlowFile variable before transferring
This commit is contained in:
Bryan Bende 2015-10-02 17:22:29 -04:00
parent 2a90bd501b
commit e748fd5848
41 changed files with 4263 additions and 2 deletions

View File

@ -792,6 +792,94 @@ This product bundles HexViewJS available under an MIT License
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
The binary distribution of this product bundles 'Jcodings' under an MIT style
license.
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
of the Software, and to permit persons to whom the Software is furnished to do
so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
The binary distribution of this product bundles 'Joni' under an MIT style
license.
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
of the Software, and to permit persons to whom the Software is furnished to do
so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
The binary distribution of this product bundles 'Google Protocol Buffers Java 2.5.0'
which is licensed under a BSD license.
This license applies to all parts of Protocol Buffers except the following:
- Atomicops support for generic gcc, located in
src/google/protobuf/stubs/atomicops_internals_generic_gcc.h.
This file is copyrighted by Red Hat Inc.
- Atomicops support for AIX/POWER, located in
src/google/protobuf/stubs/atomicops_internals_aix.h.
This file is copyrighted by Bloomberg Finance LP.
Copyright 2014, Google Inc. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
* Neither the name of Google Inc. nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
Code generated by the Protocol Buffer compiler is owned by the owner
of the input file used when generating it. This code is not
standalone and requires a support library to be linked with it. This
support library is itself covered by the above license.
This product bundles 'JCraft Jzlib' which is available under a 3-Clause BSD License. This product bundles 'JCraft Jzlib' which is available under a 3-Clause BSD License.
Copyright (c) 2002-2014 Atsuhiko Yamanaka, JCraft,Inc. Copyright (c) 2002-2014 Atsuhiko Yamanaka, JCraft,Inc.

View File

@ -746,6 +746,25 @@ The following binary components are provided under the Apache Software License v
Couchbase Java SDK Couchbase Java SDK
Copyright 2012 Netflix, Inc. Copyright 2012 Netflix, Inc.
(ASLv2) HBase Common
The following NOTICE information applies:
This product includes portions of the Guava project v14, specifically
'hbase-common/src/main/java/org/apache/hadoop/hbase/io/LimitInputStream.java'
Copyright (C) 2007 The Guava Authors
Licensed under the Apache License, Version 2.0
(ASLv2) HTrace Core
The following NOTICE information applies:
In addition, this product includes software dependencies. See
the accompanying LICENSE.txt for a listing of dependencies
that are NOT Apache licensed (with pointers to their licensing)
Apache HTrace includes an Apache Thrift connector to Zipkin. Zipkin
is a distributed tracing system that is Apache 2.0 Licensed.
Copyright 2012 Twitter, Inc.
************************ ************************
Common Development and Distribution License 1.1 Common Development and Distribution License 1.1
************************ ************************

View File

@ -232,6 +232,16 @@ language governing permissions and limitations under the License. -->
<artifactId>nifi-couchbase-nar</artifactId> <artifactId>nifi-couchbase-nar</artifactId>
<type>nar</type> <type>nar</type>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hbase-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hbase_1_1_2-client-service-nar</artifactId>
<type>nar</type>
</dependency>
</dependencies> </dependencies>
<properties> <properties>

View File

@ -26,4 +26,15 @@
<modules> <modules>
<module>nifi-hadoop-libraries-nar</module> <module>nifi-hadoop-libraries-nar</module>
</modules> </modules>
<dependencyManagement>
<dependencies>
<!-- the top-level pom forces 18.0, but Hadoop 2.6 expects 12.0.1 -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${hadoop.guava.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
</project> </project>

View File

@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hbase-bundle</artifactId>
<version>0.4.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-hbase-nar</artifactId>
<packaging>nar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hbase-processors</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,19 @@
nifi-hbase-nar
Copyright 2014-2015 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
===========================================
Apache Software License v2
===========================================
The following binary components are provided under the Apache Software License v2
(ASLv2) Apache Commons Lang
The following NOTICE information applies:
Apache Commons Lang
Copyright 2001-2015 The Apache Software Foundation
This product includes software from the Spring Framework,
under the Apache License 2.0 (see: StringUtils.containsWhitespace())

View File

@ -0,0 +1,73 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hbase-bundle</artifactId>
<version>0.4.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-hbase-processors</artifactId>
<description>Support for interacting with HBase</description>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hbase-client-service-api</artifactId>
<version>0.4.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-distributed-cache-client-service-api</artifactId>
<version>0.4.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.4</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.10.19</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.5.4</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import org.apache.nifi.processor.AbstractProcessor;
public abstract class AbstractHBaseProcessor extends AbstractProcessor {
}

View File

@ -0,0 +1,547 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnRemoved;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.hbase.io.JsonRowSerializer;
import org.apache.nifi.hbase.io.RowSerializer;
import org.apache.nifi.hbase.scan.Column;
import org.apache.nifi.hbase.scan.ResultCell;
import org.apache.nifi.hbase.scan.ResultHandler;
import org.apache.nifi.hbase.util.ObjectSerDe;
import org.apache.nifi.hbase.util.StringSerDe;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.ObjectHolder;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
@TriggerWhenEmpty
@TriggerSerially
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({"hbase", "get", "ingest"})
@CapabilityDescription("This Processor polls HBase for any records in the specified table. The processor keeps track of the timestamp of the cells that "
+ "it receives, so that as new records are pushed to HBase, they will automatically be pulled. Each record is output in JSON format, as "
+ "{\"row\": \"<row key>\", \"cells\": { \"<column 1 family>:<column 1 qualifier>\": \"<cell 1 value>\", \"<column 2 family>:<column 2 qualifier>\": \"<cell 2 value>\", ... }}. "
+ "For each record received, a Provenance RECEIVE event is emitted with the format hbase://<table name>/<row key>, where <row key> is the UTF-8 encoded value of the row's key.")
@WritesAttributes({
@WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the data was pulled from"),
@WritesAttribute(attribute = "mime.type", description = "Set to application/json to indicate that output is JSON")
})
public class GetHBase extends AbstractHBaseProcessor {
static final Pattern COLUMNS_PATTERN = Pattern.compile("\\w+(:\\w+)?(?:,\\w+(:\\w+)?)*");
static final AllowableValue NONE = new AllowableValue("None", "None");
static final AllowableValue CURRENT_TIME = new AllowableValue("Current Time", "Current Time");
static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder()
.name("HBase Client Service")
.description("Specifies the Controller Service to use for accessing HBase.")
.required(true)
.identifiesControllerService(HBaseClientService.class)
.build();
static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
.name("Distributed Cache Service")
.description("Specifies the Controller Service that should be used to maintain state about what has been pulled from HBase" +
" so that if a new node begins pulling data, it won't duplicate all of the work that has been done.")
.required(true)
.identifiesControllerService(DistributedMapCacheClient.class)
.build();
static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
.name("Character Set")
.description("Specifies which character set is used to encode the data in HBase")
.required(true)
.defaultValue("UTF-8")
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.build();
static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
.name("Table Name")
.description("The name of the HBase Table to put data into")
.required(true)
.expressionLanguageSupported(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor COLUMNS = new PropertyDescriptor.Builder()
.name("Columns")
.description("A comma-separated list of \"<colFamily>:<colQualifier>\" pairs to return when scanning. To return all columns " +
"for a given family, leave off the qualifier such as \"<colFamily1>,<colFamily2>\".")
.required(false)
.expressionLanguageSupported(false)
.addValidator(StandardValidators.createRegexMatchingValidator(COLUMNS_PATTERN))
.build();
static final PropertyDescriptor FILTER_EXPRESSION = new PropertyDescriptor.Builder()
.name("Filter Expression")
.description("An HBase filter expression that will be applied to the scan. This property can not be used when also using the Columns property.")
.required(false)
.expressionLanguageSupported(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor INITIAL_TIMERANGE = new PropertyDescriptor.Builder()
.name("Initial Time Range")
.description("The time range to use on the first scan of a table. None will pull the entire table on the first scan, " +
"Current Time will pull entries from that point forward.")
.required(true)
.expressionLanguageSupported(false)
.allowableValues(NONE, CURRENT_TIME)
.defaultValue(NONE.getValue())
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All FlowFiles are routed to this relationship")
.build();
private volatile ScanResult lastResult = null;
private volatile List<Column> columns = new ArrayList<>();
private volatile boolean electedPrimaryNode = false;
private volatile String previousTable = null;
@Override
public Set<Relationship> getRelationships() {
return Collections.singleton(REL_SUCCESS);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(HBASE_CLIENT_SERVICE);
properties.add(DISTRIBUTED_CACHE_SERVICE);
properties.add(TABLE_NAME);
properties.add(COLUMNS);
properties.add(FILTER_EXPRESSION);
properties.add(INITIAL_TIMERANGE);
properties.add(CHARSET);
return properties;
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final String columns = validationContext.getProperty(COLUMNS).getValue();
final String filter = validationContext.getProperty(FILTER_EXPRESSION).getValue();
final List<ValidationResult> problems = new ArrayList<>();
if (!StringUtils.isBlank(columns) && !StringUtils.isBlank(filter)) {
problems.add(new ValidationResult.Builder()
.subject(FILTER_EXPRESSION.getDisplayName())
.input(filter).valid(false)
.explanation("a filter expression can not be used in conjunction with the Columns property")
.build());
}
return problems;
}
@Override
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
if (descriptor.equals(TABLE_NAME)) {
lastResult = null;
}
}
@OnScheduled
public void parseColumns(final ProcessContext context) {
final String columnsValue = context.getProperty(COLUMNS).getValue();
final String[] columns = (columnsValue == null || columnsValue.isEmpty() ? new String[0] : columnsValue.split(","));
this.columns.clear();
for (final String column : columns) {
if (column.contains(":")) {
final String[] parts = column.split(":");
final byte[] cf = parts[0].getBytes(Charset.forName("UTF-8"));
final byte[] cq = parts[1].getBytes(Charset.forName("UTF-8"));
this.columns.add(new Column(cf, cq));
} else {
final byte[] cf = column.getBytes(Charset.forName("UTF-8"));
this.columns.add(new Column(cf, null));
}
}
}
@OnPrimaryNodeStateChange
public void onPrimaryNodeChange(final PrimaryNodeState newState) {
if (newState == PrimaryNodeState.ELECTED_PRIMARY_NODE) {
electedPrimaryNode = true;
}
}
@OnRemoved
public void onRemoved(final ProcessContext context) {
final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
clearState(client);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final String tableName = context.getProperty(TABLE_NAME).getValue();
final String initialTimeRange = context.getProperty(INITIAL_TIMERANGE).getValue();
final String filterExpression = context.getProperty(FILTER_EXPRESSION).getValue();
final HBaseClientService hBaseClientService = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
// if the table was changed then remove any previous state
if (previousTable != null && !tableName.equals(previousTable)) {
clearState(client);
previousTable = tableName;
}
try {
final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue());
final RowSerializer serializer = new JsonRowSerializer(charset);
this.lastResult = getState(client);
final long defaultMinTime = (initialTimeRange.equals(NONE.getValue()) ? 0L : System.currentTimeMillis());
final long minTime = (lastResult == null ? defaultMinTime : lastResult.getTimestamp());
final Map<String, Set<String>> cellsMatchingTimestamp = new HashMap<>();
final ObjectHolder<Long> rowsPulledHolder = new ObjectHolder<>(0L);
final ObjectHolder<Long> latestTimestampHolder = new ObjectHolder<>(minTime);
hBaseClientService.scan(tableName, columns, filterExpression, minTime, new ResultHandler() {
@Override
public void handle(final byte[] rowKey, final ResultCell[] resultCells) {
final String rowKeyString = new String(rowKey, StandardCharsets.UTF_8);
// check if latest cell timestamp is equal to our cutoff.
// if any of the cells have a timestamp later than our cutoff, then we
// want the row. But if the cell with the latest timestamp is equal to
// our cutoff, then we want to check if that's one of the cells that
// we have already seen.
long latestCellTimestamp = 0L;
for (final ResultCell cell : resultCells) {
if (cell.getTimestamp() > latestCellTimestamp) {
latestCellTimestamp = cell.getTimestamp();
}
}
// we've already seen this.
if (latestCellTimestamp < minTime) {
getLogger().debug("latest cell timestamp for row {} is {}, which is earlier than the minimum time of {}",
new Object[] {rowKeyString, latestCellTimestamp, minTime});
return;
}
if (latestCellTimestamp == minTime) {
// latest cell timestamp is equal to our minimum time. Check if all cells that have
// that timestamp are in our list of previously seen cells.
boolean allSeen = true;
for (final ResultCell cell : resultCells) {
if (cell.getTimestamp() == latestCellTimestamp) {
if (lastResult == null || !lastResult.contains(cell)) {
allSeen = false;
break;
}
}
}
if (allSeen) {
// we have already seen all of the cells for this row. We do not want to
// include this cell in our output.
getLogger().debug("all cells for row {} have already been seen", new Object[] { rowKeyString });
return;
}
}
// If the latest timestamp of the cell is later than the latest timestamp we have already seen,
// we want to keep track of the cells that match this timestamp so that the next time we scan,
// we can ignore these cells.
if (latestCellTimestamp >= latestTimestampHolder.get()) {
// new timestamp, so clear all of the 'matching cells'
if (latestCellTimestamp > latestTimestampHolder.get()) {
latestTimestampHolder.set(latestCellTimestamp);
cellsMatchingTimestamp.clear();
}
for (final ResultCell cell : resultCells) {
final long ts = cell.getTimestamp();
if (ts == latestCellTimestamp) {
final byte[] rowValue = Arrays.copyOfRange(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength() + cell.getRowOffset());
final byte[] cellValue = Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength() + cell.getValueOffset());
final String rowHash = new String(rowValue, StandardCharsets.UTF_8);
Set<String> cellHashes = cellsMatchingTimestamp.get(rowHash);
if (cellHashes == null) {
cellHashes = new HashSet<>();
cellsMatchingTimestamp.put(rowHash, cellHashes);
}
cellHashes.add(new String(cellValue, StandardCharsets.UTF_8));
}
}
}
// write the row to a new FlowFile.
FlowFile flowFile = session.create();
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
serializer.serialize(rowKey, resultCells, out);
}
});
final Map<String, String> attributes = new HashMap<>();
attributes.put("hbase.table", tableName);
attributes.put("mime.type", "application/json");
flowFile = session.putAllAttributes(flowFile, attributes);
session.getProvenanceReporter().receive(flowFile, "hbase://" + tableName + "/" + rowKeyString);
session.transfer(flowFile, REL_SUCCESS);
getLogger().debug("Received {} from HBase with row key {}", new Object[]{flowFile, rowKeyString});
// we could potentially have a huge number of rows. If we get to 500, go ahead and commit the
// session so that we can avoid buffering tons of FlowFiles without ever sending any out.
long rowsPulled = rowsPulledHolder.get();
rowsPulledHolder.set(++rowsPulled);
if (++rowsPulled % getBatchSize() == 0) {
session.commit();
}
}
});
final ScanResult scanResults = new ScanResult(latestTimestampHolder.get(), cellsMatchingTimestamp);
// Commit session before we replace the lastResult; if session commit fails, we want
// to pull these records again.
session.commit();
if (lastResult == null || scanResults.getTimestamp() > lastResult.getTimestamp()) {
lastResult = scanResults;
} else if (scanResults.getTimestamp() == lastResult.getTimestamp()) {
final Map<String, Set<String>> combinedResults = new HashMap<>(scanResults.getMatchingCells());
// copy the results of result.getMatchingCells() to combinedResults.
// do a deep copy because the Set may be modified below.
for (final Map.Entry<String, Set<String>> entry : scanResults.getMatchingCells().entrySet()) {
combinedResults.put(entry.getKey(), new HashSet<>(entry.getValue()));
}
// combined the results from 'lastResult'
for (final Map.Entry<String, Set<String>> entry : lastResult.getMatchingCells().entrySet()) {
final Set<String> existing = combinedResults.get(entry.getKey());
if (existing == null) {
combinedResults.put(entry.getKey(), new HashSet<>(entry.getValue()));
} else {
existing.addAll(entry.getValue());
}
}
final ScanResult scanResult = new ScanResult(scanResults.getTimestamp(), combinedResults);
lastResult = scanResult;
}
// save state to local storage and to distributed cache
persistState(client, lastResult);
} catch (final IOException e) {
getLogger().error("Failed to receive data from HBase due to {}", e);
session.rollback();
} finally {
// if we failed, we want to yield so that we don't hammer hbase. If we succeed, then we have
// pulled all of the records, so we want to wait a bit before hitting hbase again anyway.
context.yield();
}
}
// present for tests
protected int getBatchSize() {
return 500;
}
protected File getStateDir() {
return new File("conf/state");
}
protected File getStateFile() {
return new File(getStateDir(), "getHBase-" + getIdentifier());
}
protected String getKey() {
return "getHBase-" + getIdentifier() + "-state";
}
protected List<Column> getColumns() {
return columns;
}
private void persistState(final DistributedMapCacheClient client, final ScanResult scanResult) {
final File stateDir = getStateDir();
if (!stateDir.exists()) {
stateDir.mkdirs();
}
final File file = getStateFile();
try (final OutputStream fos = new FileOutputStream(file);
final ObjectOutputStream oos = new ObjectOutputStream(fos)) {
oos.writeObject(scanResult);
} catch (final IOException ioe) {
getLogger().warn("Unable to save state locally. If the node is restarted now, data may be duplicated. Failure is due to {}", ioe);
}
try {
client.put(getKey(), scanResult, new StringSerDe(), new ObjectSerDe());
} catch (final IOException ioe) {
getLogger().warn("Unable to communicate with distributed cache server due to {}. Persisting state locally instead.", ioe);
}
}
private void clearState(final DistributedMapCacheClient client) {
final File localState = getStateFile();
if (localState.exists()) {
localState.delete();
}
try {
client.remove(getKey(), new StringSerDe());
} catch (IOException e) {
getLogger().warn("Processor state was not cleared from distributed cache due to {}", new Object[]{e});
}
}
private ScanResult getState(final DistributedMapCacheClient client) throws IOException {
final StringSerDe stringSerDe = new StringSerDe();
final ObjectSerDe objectSerDe = new ObjectSerDe();
ScanResult scanResult = lastResult;
// if we have no previous result, or we just became primary, pull from distributed cache
if (scanResult == null || electedPrimaryNode) {
final Object obj = client.get(getKey(), stringSerDe, objectSerDe);
if (obj == null || !(obj instanceof ScanResult)) {
scanResult = null;
} else {
scanResult = (ScanResult) obj;
getLogger().debug("Retrieved state from the distributed cache, previous timestamp was {}" , new Object[] {scanResult.getTimestamp()});
}
// no requirement to pull an update from the distributed cache anymore.
electedPrimaryNode = false;
}
// Check the persistence file. We want to use the latest timestamp that we have so that
// we don't duplicate data.
final File file = getStateFile();
if (file.exists()) {
try (final InputStream fis = new FileInputStream(file);
final ObjectInputStream ois = new ObjectInputStream(fis)) {
final Object obj = ois.readObject();
if (obj != null && (obj instanceof ScanResult)) {
final ScanResult localScanResult = (ScanResult) obj;
if (scanResult == null || localScanResult.getTimestamp() > scanResult.getTimestamp()) {
scanResult = localScanResult;
getLogger().debug("Using last timestamp from local state because it was newer than the distributed cache, or no value existed in the cache");
// Our local persistence file shows a later time than the Distributed service.
// Update the distributed service to match our local state.
try {
client.put(getKey(), localScanResult, stringSerDe, objectSerDe);
} catch (final IOException ioe) {
getLogger().warn("Local timestamp is {}, which is later than Distributed state but failed to update Distributed "
+ "state due to {}. If a new node performs GetHBase Listing, data duplication may occur",
new Object[] {localScanResult.getTimestamp(), ioe});
}
}
}
} catch (final IOException | ClassNotFoundException ioe) {
getLogger().warn("Failed to recover persisted state from {} due to {}. Assuming that state from distributed cache is correct.", new Object[]{file, ioe});
}
}
return scanResult;
}
public static class ScanResult implements Serializable {
private static final long serialVersionUID = 1L;
private final long latestTimestamp;
private final Map<String, Set<String>> matchingCellHashes;
public ScanResult(final long timestamp, final Map<String, Set<String>> cellHashes) {
latestTimestamp = timestamp;
matchingCellHashes = cellHashes;
}
public long getTimestamp() {
return latestTimestamp;
}
public Map<String, Set<String>> getMatchingCells() {
return matchingCellHashes;
}
public boolean contains(final ResultCell cell) {
if (cell.getTimestamp() != latestTimestamp) {
return false;
}
final byte[] row = Arrays.copyOfRange(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength() + cell.getRowOffset());
final String rowHash = new String(row, StandardCharsets.UTF_8);
final Set<String> cellHashes = matchingCellHashes.get(rowHash);
if (cellHashes == null) {
return false;
}
final byte[] cellValue = Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength() + cell.getValueOffset());
final String cellHash = new String(cellValue, StandardCharsets.UTF_8);
return cellHashes.contains(cellHash);
}
}
}

View File

@ -0,0 +1,202 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.hbase.put.PutFlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@EventDriven
@SupportsBatching
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"hadoop", "hbase"})
@CapabilityDescription("Adds the Contents of a FlowFile to HBase as the value of a single cell")
public class PutHBaseCell extends AbstractProcessor {
protected static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder()
.name("HBase Client Service")
.description("Specifies the Controller Service to use for accessing HBase.")
.required(true)
.identifiesControllerService(HBaseClientService.class)
.build();
protected static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
.name("Table Name")
.description("The name of the HBase Table to put data into")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor ROW = new PropertyDescriptor.Builder()
.name("Row Identifier")
.description("Specifies the Row ID to use when inserting data into HBase")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor COLUMN_FAMILY = new PropertyDescriptor.Builder()
.name("Column Family")
.description("The Column Family to use when inserting data into HBase")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor COLUMN_QUALIFIER = new PropertyDescriptor.Builder()
.name("Column Qualifier")
.description("The Column Qualifier to use when inserting data into HBase")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Batch Size")
.description("The maximum number of FlowFiles to process in a single execution. The FlowFiles will be " +
"grouped by table, and a single Put per table will be performed.")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("25")
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("A FlowFile is routed to this relationship after it has been successfully stored in HBase")
.build();
static final Relationship FAILURE = new Relationship.Builder()
.name("failure")
.description("A FlowFile is routed to this relationship if it cannot be sent to HBase")
.build();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(HBASE_CLIENT_SERVICE);
properties.add(TABLE_NAME);
properties.add(ROW);
properties.add(COLUMN_FAMILY);
properties.add(COLUMN_QUALIFIER);
properties.add(BATCH_SIZE);
return properties;
}
@Override
public Set<Relationship> getRelationships() {
final Set<Relationship> rels = new HashSet<>();
rels.add(REL_SUCCESS);
rels.add(FAILURE);
return rels;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
List<FlowFile> flowFiles = session.get(batchSize);
if (flowFiles == null || flowFiles.size() == 0) {
return;
}
final Map<String,List<PutFlowFile>> tablePuts = new HashMap<>();
// Group FlowFiles by HBase Table
for (final FlowFile flowFile : flowFiles) {
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String row = context.getProperty(ROW).evaluateAttributeExpressions(flowFile).getValue();
final String columnFamily = context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue();
final String columnQualifier = context.getProperty(COLUMN_QUALIFIER).evaluateAttributeExpressions(flowFile).getValue();
if (StringUtils.isBlank(tableName) || StringUtils.isBlank(row) || StringUtils.isBlank(columnFamily) || StringUtils.isBlank(columnQualifier)) {
getLogger().error("Invalid FlowFile {} missing table, row, column familiy, or column qualifier; routing to failure", new Object[]{flowFile});
session.transfer(flowFile, FAILURE);
} else {
final byte[] buffer = new byte[(int) flowFile.getSize()];
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
StreamUtils.fillBuffer(in, buffer);
}
});
final PutFlowFile putFlowFile = new PutFlowFile(tableName, row, columnFamily, columnQualifier, buffer, flowFile);
List<PutFlowFile> putFlowFiles = tablePuts.get(tableName);
if (putFlowFiles == null) {
putFlowFiles = new ArrayList<>();
tablePuts.put(tableName, putFlowFiles);
}
putFlowFiles.add(putFlowFile);
}
}
getLogger().debug("Sending {} FlowFiles to HBase in {} put operations", new Object[] {flowFiles.size(), tablePuts.size()});
final long start = System.nanoTime();
final List<PutFlowFile> successes = new ArrayList<>();
final HBaseClientService hBaseClientService = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
for (Map.Entry<String, List<PutFlowFile>> entry : tablePuts.entrySet()) {
try {
hBaseClientService.put(entry.getKey(), entry.getValue());
successes.addAll(entry.getValue());
} catch (Exception e) {
getLogger().error(e.getMessage(), e);
for (PutFlowFile putFlowFile : entry.getValue()) {
getLogger().error("Failed to send {} to HBase due to {}; routing to failure", new Object[]{putFlowFile.getFlowFile(), e});
final FlowFile failure = session.penalize(putFlowFile.getFlowFile());
session.transfer(failure, FAILURE);
}
}
}
final long sendMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
getLogger().debug("Sent {} FlowFiles to HBase successfully in {} milliseconds", new Object[] {successes.size(), sendMillis});
for (PutFlowFile putFlowFile : successes) {
session.transfer(putFlowFile.getFlowFile(), REL_SUCCESS);
session.getProvenanceReporter().send(putFlowFile.getFlowFile(), getTransitUri(putFlowFile), sendMillis);
}
}
protected String getTransitUri(PutFlowFile putFlowFile) {
return "hbase://" + putFlowFile.getTableName() + "/" + putFlowFile.getRow() + "/" + putFlowFile.getColumnFamily()
+ ":" + putFlowFile.getColumnQualifier();
}
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase.io;
import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.nifi.hbase.scan.ResultCell;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.Charset;
public class JsonRowSerializer implements RowSerializer {
private final Charset charset;
public JsonRowSerializer(final Charset charset) {
this.charset = charset;
}
@Override
public void serialize(final byte[] rowKey, final ResultCell[] cells, final OutputStream out) throws IOException {
final StringBuilder jsonBuilder = new StringBuilder();
jsonBuilder.append("{");
final String row = new String(rowKey, charset);
jsonBuilder.append("\"row\":")
.append("\"")
.append(StringEscapeUtils.escapeJson(row))
.append("\"");
jsonBuilder.append(", \"cells\": {");
int i = 0;
for (final ResultCell cell : cells) {
final String cellFamily = new String(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), charset);
final String cellQualifier = new String(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(), charset);
if (i > 0) {
jsonBuilder.append(", ");
}
jsonBuilder.append("\"")
.append(StringEscapeUtils.escapeJson(cellFamily))
.append(":")
.append(StringEscapeUtils.escapeJson(cellQualifier))
.append("\":\"")
.append(StringEscapeUtils.escapeJson(new String(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(), charset)))
.append("\"");
i++;
}
jsonBuilder.append("}}");
final String json = jsonBuilder.toString();
out.write(json.getBytes(charset));
}
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase.io;
import org.apache.nifi.hbase.scan.ResultCell;
import java.io.IOException;
import java.io.OutputStream;
public interface RowSerializer {
/**
* Serializes the given row and cells to the provided OutputStream
*
* @param rowKey the row's key
* @param cells the cells to serialize
* @param out the OutputStream to serialize to
* @throws IOException if unable to serialize the row
*/
void serialize(byte[] rowKey, ResultCell[] cells, OutputStream out) throws IOException;
}

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase.util;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
import org.apache.nifi.distributed.cache.client.exception.SerializationException;
import org.apache.nifi.stream.io.ByteArrayInputStream;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
public class ObjectSerDe implements Serializer<Object>, Deserializer<Object> {
@Override
public Object deserialize(byte[] input) throws DeserializationException, IOException {
if (input == null || input.length == 0) {
return null;
}
try (final ByteArrayInputStream in = new ByteArrayInputStream(input);
final ObjectInputStream objIn = new ObjectInputStream(in)) {
return objIn.readObject();
} catch (ClassNotFoundException e) {
throw new DeserializationException("Could not deserialize object due to ClassNotFoundException", e);
}
}
@Override
public void serialize(Object value, OutputStream output) throws SerializationException, IOException {
try (final ByteArrayOutputStream bOut = new ByteArrayOutputStream();
final ObjectOutputStream objOut = new ObjectOutputStream(bOut)) {
objOut.writeObject(value);
output.write(bOut.toByteArray());
}
}
}

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase.util;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
import org.apache.nifi.distributed.cache.client.exception.SerializationException;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
public class StringSerDe implements Serializer<String>, Deserializer<String> {
@Override
public String deserialize(final byte[] value) throws DeserializationException, IOException {
if ( value == null ) {
return null;
}
return new String(value, StandardCharsets.UTF_8);
}
@Override
public void serialize(final String value, final OutputStream out) throws SerializationException, IOException {
out.write(value.getBytes(StandardCharsets.UTF_8));
}
}

View File

@ -0,0 +1,17 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.hbase.GetHBase
org.apache.nifi.hbase.PutHBaseCell

View File

@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.hbase.put.PutFlowFile;
import org.apache.nifi.hbase.scan.Column;
import org.apache.nifi.hbase.scan.ResultCell;
import org.apache.nifi.hbase.scan.ResultHandler;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class MockHBaseClientService extends AbstractControllerService implements HBaseClientService {
private Map<String,ResultCell[]> results = new HashMap<>();
private Map<String, List<PutFlowFile>> puts = new HashMap<>();
private boolean throwException = false;
@Override
public void put(String tableName, Collection<PutFlowFile> puts) throws IOException {
if (throwException) {
throw new IOException("exception");
}
this.puts.put(tableName, new ArrayList<>(puts));
}
@Override
public void scan(String tableName, Collection<Column> columns, String filterExpression, long minTime, ResultHandler handler) throws IOException {
if (throwException) {
throw new IOException("exception");
}
// pass all the staged data to the handler
for (final Map.Entry<String,ResultCell[]> entry : results.entrySet()) {
handler.handle(entry.getKey().getBytes(StandardCharsets.UTF_8), entry.getValue());
}
}
public void addResult(final String rowKey, final Map<String, String> cells, final long timestamp) {
final byte[] rowArray = rowKey.getBytes(StandardCharsets.UTF_8);
final ResultCell[] cellArray = new ResultCell[cells.size()];
int i = 0;
for (final Map.Entry<String, String> cellEntry : cells.entrySet()) {
final ResultCell cell = new ResultCell();
cell.setRowArray(rowArray);
cell.setRowOffset(0);
cell.setRowLength((short) rowArray.length);
final String cellValue = cellEntry.getValue();
final byte[] valueArray = cellValue.getBytes(StandardCharsets.UTF_8);
cell.setValueArray(valueArray);
cell.setValueOffset(0);
cell.setValueLength(valueArray.length);
final byte[] familyArray = "nifi".getBytes(StandardCharsets.UTF_8);
cell.setFamilyArray(familyArray);
cell.setFamilyOffset(0);
cell.setFamilyLength((byte) familyArray.length);
final String qualifier = cellEntry.getKey();
final byte[] qualifierArray = qualifier.getBytes(StandardCharsets.UTF_8);
cell.setQualifierArray(qualifierArray);
cell.setQualifierOffset(0);
cell.setQualifierLength(qualifierArray.length);
cell.setTimestamp(timestamp);
cellArray[i++] = cell;
}
results.put(rowKey, cellArray);
}
public Map<String, List<PutFlowFile>> getPuts() {
return puts;
}
public void setThrowException(boolean throwException) {
this.throwException = throwException;
}
}

View File

@ -0,0 +1,459 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.hbase.scan.Column;
import org.apache.nifi.hbase.util.StringSerDe;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class TestGetHBase {
private TestRunner runner;
private MockGetHBase proc;
private MockCacheClient cacheClient;
private MockHBaseClientService hBaseClient;
@Before
public void setup() throws InitializationException {
proc = new MockGetHBase();
runner = TestRunners.newTestRunner(proc);
cacheClient = new MockCacheClient();
runner.addControllerService("cacheClient", cacheClient);
runner.enableControllerService(cacheClient);
hBaseClient = new MockHBaseClientService();
runner.addControllerService("hbaseClient", hBaseClient);
runner.enableControllerService(hBaseClient);
runner.setProperty(GetHBase.TABLE_NAME, "nifi");
runner.setProperty(GetHBase.DISTRIBUTED_CACHE_SERVICE, "cacheClient");
runner.setProperty(GetHBase.HBASE_CLIENT_SERVICE, "hbaseClient");
}
@After
public void cleanup() {
final File file = proc.getStateFile();
if (file.exists()) {
file.delete();
}
Assert.assertFalse(file.exists());
}
@Test
public void testColumnsValidation() {
runner.assertValid();
runner.setProperty(GetHBase.COLUMNS, "cf1:cq1");
runner.assertValid();
runner.setProperty(GetHBase.COLUMNS, "cf1");
runner.assertValid();
runner.setProperty(GetHBase.COLUMNS, "cf1:cq1,cf2:cq2,cf3:cq3");
runner.assertValid();
runner.setProperty(GetHBase.COLUMNS, "cf1,cf2:cq1,cf3");
runner.assertValid();
runner.setProperty(GetHBase.COLUMNS, "cf1 cf2,cf3");
runner.assertNotValid();
runner.setProperty(GetHBase.COLUMNS, "cf1:,cf2,cf3");
runner.assertNotValid();
runner.setProperty(GetHBase.COLUMNS, "cf1:cq1,");
runner.assertNotValid();
}
@Test
public void testRowCounts() {
final long now = System.currentTimeMillis();
final Map<String, String> cells = new HashMap<>();
cells.put("greeting", "hello");
cells.put("name", "nifi");
hBaseClient.addResult("row0", cells, now - 2);
hBaseClient.addResult("row1", cells, now - 1);
hBaseClient.addResult("row2", cells, now - 1);
hBaseClient.addResult("row3", cells, now);
runner.run(100);
runner.assertAllFlowFilesTransferred(GetHBase.REL_SUCCESS, 4);
hBaseClient.addResult("row4", cells, now + 1);
runner.run();
runner.assertAllFlowFilesTransferred(GetHBase.REL_SUCCESS, 5);
}
@Test
public void testPersistAndRecoverFromLocalState() throws InitializationException {
final File stateFile = new File("target/test-recover-state.bin");
if (!stateFile.delete() && stateFile.exists()) {
Assert.fail("Could not delete state file " + stateFile);
}
proc.setStateFile(stateFile);
final long now = System.currentTimeMillis();
final Map<String, String> cells = new HashMap<>();
cells.put("greeting", "hello");
cells.put("name", "nifi");
hBaseClient.addResult("row0", cells, now - 2);
hBaseClient.addResult("row1", cells, now - 1);
hBaseClient.addResult("row2", cells, now - 1);
hBaseClient.addResult("row3", cells, now);
runner.run(100);
runner.assertAllFlowFilesTransferred(GetHBase.REL_SUCCESS, 4);
hBaseClient.addResult("row4", cells, now + 1);
runner.run();
runner.assertAllFlowFilesTransferred(GetHBase.REL_SUCCESS, 5);
proc = new MockGetHBase(stateFile);
final TestRunner newRunner = TestRunners.newTestRunner(proc);
newRunner.addControllerService("cacheClient", cacheClient);
newRunner.enableControllerService(cacheClient);
newRunner.addControllerService("hbaseClient", hBaseClient);
newRunner.enableControllerService(hBaseClient);
newRunner.setProperty(GetHBase.TABLE_NAME, "nifi");
newRunner.setProperty(GetHBase.DISTRIBUTED_CACHE_SERVICE, "cacheClient");
newRunner.setProperty(GetHBase.HBASE_CLIENT_SERVICE, "hbaseClient");
hBaseClient.addResult("row0", cells, now - 2);
hBaseClient.addResult("row1", cells, now - 1);
hBaseClient.addResult("row2", cells, now - 1);
hBaseClient.addResult("row3", cells, now);
newRunner.run(100);
newRunner.assertAllFlowFilesTransferred(GetHBase.REL_SUCCESS, 0);
}
@Test
public void testBecomePrimaryWithNoLocalState() throws InitializationException {
final long now = System.currentTimeMillis();
final Map<String, String> cells = new HashMap<>();
cells.put("greeting", "hello");
cells.put("name", "nifi");
hBaseClient.addResult("row0", cells, now - 2);
hBaseClient.addResult("row1", cells, now - 1);
hBaseClient.addResult("row2", cells, now - 1);
hBaseClient.addResult("row3", cells, now);
runner.run(100);
runner.assertAllFlowFilesTransferred(GetHBase.REL_SUCCESS, 4);
hBaseClient.addResult("row4", cells, now + 1);
runner.run();
runner.assertAllFlowFilesTransferred(GetHBase.REL_SUCCESS, 5);
// delete the processor's local state to simulate becoming the primary node
// for the first time, should use the state from distributed cache
final File stateFile = proc.getStateFile();
if (!stateFile.delete() && stateFile.exists()) {
Assert.fail("Could not delete state file " + stateFile);
}
proc.onPrimaryNodeChange(PrimaryNodeState.ELECTED_PRIMARY_NODE);
hBaseClient.addResult("row0", cells, now - 2);
hBaseClient.addResult("row1", cells, now - 1);
hBaseClient.addResult("row2", cells, now - 1);
hBaseClient.addResult("row3", cells, now);
hBaseClient.addResult("row4", cells, now + 1);
runner.clearTransferState();
runner.run(100);
runner.assertAllFlowFilesTransferred(GetHBase.REL_SUCCESS, 0);
}
@Test
public void testBecomePrimaryWithNewerLocalState() throws InitializationException {
final long now = System.currentTimeMillis();
final Map<String, String> cells = new HashMap<>();
cells.put("greeting", "hello");
cells.put("name", "nifi");
hBaseClient.addResult("row0", cells, now - 2);
hBaseClient.addResult("row1", cells, now - 1);
hBaseClient.addResult("row2", cells, now - 1);
hBaseClient.addResult("row3", cells, now);
runner.run(100);
runner.assertAllFlowFilesTransferred(GetHBase.REL_SUCCESS, 4);
// trick for testing so that row4 gets written to local state but not to the real cache
final MockCacheClient otherCacheClient = new MockCacheClient();
runner.addControllerService("otherCacheClient", otherCacheClient);
runner.enableControllerService(otherCacheClient);
runner.setProperty(GetHBase.DISTRIBUTED_CACHE_SERVICE, "otherCacheClient");
hBaseClient.addResult("row4", cells, now + 1);
runner.run();
runner.assertAllFlowFilesTransferred(GetHBase.REL_SUCCESS, 5);
// set back the original cache cacheClient which is missing row4
runner.setProperty(GetHBase.DISTRIBUTED_CACHE_SERVICE, "cacheClient");
// become the primary node, but we have existing local state with rows 0-4
// so we shouldn't get any output because we should use the local state
proc.onPrimaryNodeChange(PrimaryNodeState.ELECTED_PRIMARY_NODE);
hBaseClient.addResult("row0", cells, now - 2);
hBaseClient.addResult("row1", cells, now - 1);
hBaseClient.addResult("row2", cells, now - 1);
hBaseClient.addResult("row3", cells, now);
hBaseClient.addResult("row4", cells, now + 1);
runner.clearTransferState();
runner.run(100);
runner.assertAllFlowFilesTransferred(GetHBase.REL_SUCCESS, 0);
}
@Test
public void testOnRemovedClearsState() throws IOException {
final long now = System.currentTimeMillis();
final Map<String, String> cells = new HashMap<>();
cells.put("greeting", "hello");
cells.put("name", "nifi");
hBaseClient.addResult("row0", cells, now - 2);
hBaseClient.addResult("row1", cells, now - 1);
hBaseClient.addResult("row2", cells, now - 1);
hBaseClient.addResult("row3", cells, now);
runner.run(100);
runner.assertAllFlowFilesTransferred(GetHBase.REL_SUCCESS, 4);
// should have a local state file and a cache entry before removing
Assert.assertTrue(proc.getStateFile().exists());
Assert.assertTrue(cacheClient.containsKey(proc.getKey(), new StringSerDe()));
proc.onRemoved(runner.getProcessContext());
// onRemoved should have cleared both
Assert.assertFalse(proc.getStateFile().exists());
Assert.assertFalse(cacheClient.containsKey(proc.getKey(), new StringSerDe()));
}
@Test
public void testChangeTableNameClearsState() {
final long now = System.currentTimeMillis();
final Map<String, String> cells = new HashMap<>();
cells.put("greeting", "hello");
cells.put("name", "nifi");
hBaseClient.addResult("row0", cells, now - 2);
hBaseClient.addResult("row1", cells, now - 1);
hBaseClient.addResult("row2", cells, now - 1);
hBaseClient.addResult("row3", cells, now);
runner.run(100);
runner.assertAllFlowFilesTransferred(GetHBase.REL_SUCCESS, 4);
// change the table name and run again, should get all the data coming out
// again because previous state will be wiped
runner.setProperty(GetHBase.TABLE_NAME, "otherTable");
hBaseClient.addResult("row0", cells, now - 2);
hBaseClient.addResult("row1", cells, now - 1);
hBaseClient.addResult("row2", cells, now - 1);
hBaseClient.addResult("row3", cells, now);
runner.run(100);
runner.assertAllFlowFilesTransferred(GetHBase.REL_SUCCESS, 4);
}
@Test
public void testInitialTimeCurrentTime() {
runner.setProperty(GetHBase.INITIAL_TIMERANGE, GetHBase.CURRENT_TIME);
final long now = System.currentTimeMillis();
final Map<String, String> cells = new HashMap<>();
cells.put("greeting", "hello");
cells.put("name", "nifi");
hBaseClient.addResult("row0", cells, now - 4000);
hBaseClient.addResult("row1", cells, now - 3000);
hBaseClient.addResult("row2", cells, now - 2000);
hBaseClient.addResult("row3", cells, now - 1000);
// should not get any output because the mock results have a time before current time
runner.run(100);
runner.assertAllFlowFilesTransferred(GetHBase.REL_SUCCESS, 0);
}
@Test
public void testParseColumns() {
runner.setProperty(GetHBase.COLUMNS, "cf1,cf2:cq1,cf3");
proc.parseColumns(runner.getProcessContext());
final List<Column> expectedCols = new ArrayList<>();
expectedCols.add(new Column("cf1".getBytes(Charset.forName("UTF-8")), null));
expectedCols.add(new Column("cf2".getBytes(Charset.forName("UTF-8")), "cq1".getBytes(Charset.forName("UTF-8"))));
expectedCols.add(new Column("cf3".getBytes(Charset.forName("UTF-8")), null));
final List<Column> actualColumns = proc.getColumns();
Assert.assertNotNull(actualColumns);
Assert.assertEquals(expectedCols.size(), actualColumns.size());
for (final Column expectedCol : expectedCols) {
boolean found = false;
for (final Column providedCol : actualColumns) {
if (expectedCol.equals(providedCol)) {
found = true;
break;
}
}
Assert.assertTrue("Didn't find expected column", found);
}
}
@Test
public void testCustomValidate() throws CharacterCodingException {
runner.setProperty(GetHBase.FILTER_EXPRESSION, "PrefixFilter ('Row') AND PageFilter (1) AND FirstKeyOnlyFilter ()");
runner.assertValid();
runner.setProperty(GetHBase.COLUMNS, "colA");
runner.assertNotValid();
}
// Mock processor to override the location of the state file
private static class MockGetHBase extends GetHBase {
private static final String DEFAULT_STATE_FILE_NAME = "target/TestGetHBase.bin";
private File stateFile;
public MockGetHBase() {
this(new File(DEFAULT_STATE_FILE_NAME));
}
public MockGetHBase(final File stateFile) {
this.stateFile = stateFile;
}
public void setStateFile(final File stateFile) {
this.stateFile = stateFile;
}
@Override
protected int getBatchSize() {
return 2;
}
@Override
protected File getStateDir() {
return new File("target");
}
@Override
protected File getStateFile() {
return stateFile;
}
}
private class MockCacheClient extends AbstractControllerService implements DistributedMapCacheClient {
private final ConcurrentMap<Object, Object> values = new ConcurrentHashMap<>();
private boolean failOnCalls = false;
private void verifyNotFail() throws IOException {
if ( failOnCalls ) {
throw new IOException("Could not call to remote cacheClient because Unit Test marked cacheClient unavailable");
}
}
@Override
public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
verifyNotFail();
final Object retValue = values.putIfAbsent(key, value);
return (retValue == null);
}
@Override
@SuppressWarnings("unchecked")
public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer,
final Deserializer<V> valueDeserializer) throws IOException {
verifyNotFail();
return (V) values.putIfAbsent(key, value);
}
@Override
public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException {
verifyNotFail();
return values.containsKey(key);
}
@Override
public <K, V> void put(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
verifyNotFail();
values.put(key, value);
}
@Override
@SuppressWarnings("unchecked")
public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException {
verifyNotFail();
return (V) values.get(key);
}
@Override
public void close() throws IOException {
}
@Override
public <K> boolean remove(final K key, final Serializer<K> serializer) throws IOException {
verifyNotFail();
values.remove(key);
return true;
}
}
}

View File

@ -0,0 +1,274 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import org.apache.nifi.hbase.put.PutFlowFile;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
public class TestPutHBaseCell {
@Test
public void testSingleFlowFile() throws IOException, InitializationException {
final String tableName = "nifi";
final String row = "row1";
final String columnFamily = "family1";
final String columnQualifier = "qualifier1";
final TestRunner runner = TestRunners.newTestRunner(PutHBaseCell.class);
runner.setProperty(PutHBaseCell.TABLE_NAME, tableName);
runner.setProperty(PutHBaseCell.ROW, row);
runner.setProperty(PutHBaseCell.COLUMN_FAMILY, columnFamily);
runner.setProperty(PutHBaseCell.COLUMN_QUALIFIER, columnQualifier);
runner.setProperty(PutHBaseCell.BATCH_SIZE, "1");
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
final String content = "some content";
runner.enqueue(content.getBytes("UTF-8"));
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
outFile.assertContentEquals(content);
assertNotNull(hBaseClient.getPuts());
assertEquals(1, hBaseClient.getPuts().size());
List<PutFlowFile> puts = hBaseClient.getPuts().get(tableName);
assertEquals(1, puts.size());
verifyPut(row, columnFamily, columnQualifier, content, puts.get(0));
}
@Test
public void testSingleFlowFileWithEL() throws IOException, InitializationException {
final String tableName = "nifi";
final String row = "row1";
final String columnFamily = "family1";
final String columnQualifier = "qualifier1";
final PutHBaseCell proc = new PutHBaseCell();
final TestRunner runner = getTestRunnerWithEL(proc);
runner.setProperty(PutHBaseCell.BATCH_SIZE, "1");
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
final String content = "some content";
final Map<String, String> attributes = getAtrributeMapWithEL(tableName, row, columnFamily, columnQualifier);
runner.enqueue(content.getBytes("UTF-8"), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
outFile.assertContentEquals(content);
assertNotNull(hBaseClient.getPuts());
assertEquals(1, hBaseClient.getPuts().size());
List<PutFlowFile> puts = hBaseClient.getPuts().get(tableName);
assertEquals(1, puts.size());
verifyPut(row, columnFamily, columnQualifier, content, puts.get(0));
}
@Test
public void testSingleFlowFileWithELMissingAttributes() throws IOException, InitializationException {
final PutHBaseCell proc = new PutHBaseCell();
final TestRunner runner = getTestRunnerWithEL(proc);
runner.setProperty(PutHBaseCell.BATCH_SIZE, "1");
final MockHBaseClientService hBaseClient = new MockHBaseClientService();
runner.addControllerService("hbaseClient", hBaseClient);
runner.enableControllerService(hBaseClient);
runner.setProperty(PutHBaseCell.HBASE_CLIENT_SERVICE, "hbaseClient");
getHBaseClientService(runner);
final String content = "some content";
runner.enqueue(content.getBytes("UTF-8"), new HashMap<String, String>());
runner.run();
runner.assertTransferCount(PutHBaseCell.REL_SUCCESS, 0);
runner.assertTransferCount(PutHBaseCell.FAILURE, 1);
}
@Test
public void testMultipleFlowFileWithELOneMissingAttributes() throws IOException, InitializationException {
final PutHBaseCell proc = new PutHBaseCell();
final TestRunner runner = getTestRunnerWithEL(proc);
runner.setProperty(PutHBaseCell.BATCH_SIZE, "10");
final MockHBaseClientService hBaseClient = new MockHBaseClientService();
runner.addControllerService("hbaseClient", hBaseClient);
runner.enableControllerService(hBaseClient);
runner.setProperty(PutHBaseCell.HBASE_CLIENT_SERVICE, "hbaseClient");
getHBaseClientService(runner);
// this one will go to failure
final String content = "some content";
runner.enqueue(content.getBytes("UTF-8"), new HashMap<String, String>());
// this will go to success
final String content2 = "some content2";
final Map<String, String> attributes = getAtrributeMapWithEL("table", "row", "cf", "cq");
runner.enqueue(content2.getBytes("UTF-8"), attributes);
runner.run();
runner.assertTransferCount(PutHBaseCell.REL_SUCCESS, 1);
runner.assertTransferCount(PutHBaseCell.FAILURE, 1);
}
@Test
public void testMultipleFlowFilesSameTableDifferentRow() throws IOException, InitializationException {
final String tableName = "nifi";
final String row1 = "row1";
final String row2 = "row2";
final String columnFamily = "family1";
final String columnQualifier = "qualifier1";
final PutHBaseCell proc = new PutHBaseCell();
final TestRunner runner = getTestRunnerWithEL(proc);
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
final String content1 = "some content1";
final Map<String, String> attributes1 = getAtrributeMapWithEL(tableName, row1, columnFamily, columnQualifier);
runner.enqueue(content1.getBytes("UTF-8"), attributes1);
final String content2 = "some content1";
final Map<String, String> attributes2 = getAtrributeMapWithEL(tableName, row2, columnFamily, columnQualifier);
runner.enqueue(content2.getBytes("UTF-8"), attributes2);
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
outFile.assertContentEquals(content1);
assertNotNull(hBaseClient.getPuts());
assertEquals(1, hBaseClient.getPuts().size());
List<PutFlowFile> puts = hBaseClient.getPuts().get(tableName);
assertEquals(2, puts.size());
verifyPut(row1, columnFamily, columnQualifier, content1, puts.get(0));
verifyPut(row2, columnFamily, columnQualifier, content2, puts.get(1));
}
@Test
public void testMultipleFlowFilesSameTableDifferentRowFailure() throws IOException, InitializationException {
final String tableName = "nifi";
final String row1 = "row1";
final String row2 = "row2";
final String columnFamily = "family1";
final String columnQualifier = "qualifier1";
final PutHBaseCell proc = new PutHBaseCell();
final TestRunner runner = getTestRunnerWithEL(proc);
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
hBaseClient.setThrowException(true);
final String content1 = "some content1";
final Map<String, String> attributes1 = getAtrributeMapWithEL(tableName, row1, columnFamily, columnQualifier);
runner.enqueue(content1.getBytes("UTF-8"), attributes1);
final String content2 = "some content1";
final Map<String, String> attributes2 = getAtrributeMapWithEL(tableName, row2, columnFamily, columnQualifier);
runner.enqueue(content2.getBytes("UTF-8"), attributes2);
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.FAILURE, 2);
}
@Test
public void testMultipleFlowFilesSameTableSameRow() throws IOException, InitializationException {
final String tableName = "nifi";
final String row = "row1";
final String columnFamily = "family1";
final String columnQualifier = "qualifier1";
final PutHBaseCell proc = new PutHBaseCell();
final TestRunner runner = getTestRunnerWithEL(proc);
final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
final String content1 = "some content1";
final Map<String, String> attributes1 = getAtrributeMapWithEL(tableName, row, columnFamily, columnQualifier);
runner.enqueue(content1.getBytes("UTF-8"), attributes1);
final String content2 = "some content1";
runner.enqueue(content2.getBytes("UTF-8"), attributes1);
runner.run();
runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
outFile.assertContentEquals(content1);
assertNotNull(hBaseClient.getPuts());
assertEquals(1, hBaseClient.getPuts().size());
List<PutFlowFile> puts = hBaseClient.getPuts().get(tableName);
assertEquals(2, puts.size());
verifyPut(row, columnFamily, columnQualifier, content1, puts.get(0));
verifyPut(row, columnFamily, columnQualifier, content2, puts.get(1));
}
private Map<String, String> getAtrributeMapWithEL(String tableName, String row, String columnFamily, String columnQualifier) {
final Map<String,String> attributes1 = new HashMap<>();
attributes1.put("hbase.tableName", tableName);
attributes1.put("hbase.row", row);
attributes1.put("hbase.columnFamily", columnFamily);
attributes1.put("hbase.columnQualifier", columnQualifier);
return attributes1;
}
private TestRunner getTestRunnerWithEL(PutHBaseCell proc) {
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutHBaseCell.TABLE_NAME, "${hbase.tableName}");
runner.setProperty(PutHBaseCell.ROW, "${hbase.row}");
runner.setProperty(PutHBaseCell.COLUMN_FAMILY, "${hbase.columnFamily}");
runner.setProperty(PutHBaseCell.COLUMN_QUALIFIER, "${hbase.columnQualifier}");
return runner;
}
private MockHBaseClientService getHBaseClientService(TestRunner runner) throws InitializationException {
final MockHBaseClientService hBaseClient = new MockHBaseClientService();
runner.addControllerService("hbaseClient", hBaseClient);
runner.enableControllerService(hBaseClient);
runner.setProperty(PutHBaseCell.HBASE_CLIENT_SERVICE, "hbaseClient");
return hBaseClient;
}
private void verifyPut(String row, String columnFamily, String columnQualifier, String content, PutFlowFile put) {
assertEquals(row, put.getRow());
assertEquals(columnFamily, put.getColumnFamily());
assertEquals(columnQualifier, put.getColumnQualifier());
assertEquals(content, new String(put.getBuffer(), StandardCharsets.UTF_8));
}
}

View File

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase.util;
import org.apache.nifi.stream.io.ByteArrayInputStream;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
public class TestObjectSerDe {
@Test
public void testDeserializeSuccessful() throws IOException {
final ObjectSerDe serDe = new ObjectSerDe();
final String myObject = "myObject";
final ByteArrayOutputStream bOut = new ByteArrayOutputStream();
final ObjectOutputStream out = new ObjectOutputStream(bOut);
out.writeObject(myObject);
byte[] myObjectBytes = bOut.toByteArray();
Assert.assertNotNull(myObjectBytes);
Assert.assertTrue(myObjectBytes.length > 0);
final Object deserialized = serDe.deserialize(myObjectBytes);
Assert.assertTrue(deserialized instanceof String);
Assert.assertEquals(myObject, deserialized);
}
@Test
public void testDeserializeNull() throws IOException {
final ObjectSerDe serDe = new ObjectSerDe();
final Object deserialized = serDe.deserialize(null);
Assert.assertNull(deserialized);
}
@Test
public void testSerialize() throws IOException, ClassNotFoundException {
final ByteArrayOutputStream out = new ByteArrayOutputStream();
final String myObject = "myObject";
final ObjectSerDe serDe = new ObjectSerDe();
serDe.serialize(myObject, out);
final ByteArrayInputStream bIn = new ByteArrayInputStream(out.toByteArray());
final ObjectInputStream in = new ObjectInputStream(bIn);
final Object deserialized = in.readObject();
Assert.assertTrue(deserialized instanceof String);
Assert.assertEquals(myObject, deserialized);
}
}

View File

@ -0,0 +1,54 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>0.4.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-hbase-bundle</artifactId>
<packaging>pom</packaging>
<modules>
<module>nifi-hbase-processors</module>
<module>nifi-hbase-nar</module>
</modules>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hbase-processors</artifactId>
<version>0.4.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.1.2</version>
</dependency>
<!-- the top-level pom forces 18.0, but HBase 2.6 expects 12.0.1 -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${hadoop.guava.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>

View File

@ -0,0 +1,39 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services</artifactId>
<version>0.4.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-hbase-client-service-api</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.hbase.put.PutFlowFile;
import org.apache.nifi.hbase.scan.Column;
import org.apache.nifi.hbase.scan.ResultHandler;
import org.apache.nifi.hbase.validate.ConfigFilesValidator;
import java.io.IOException;
import java.util.Collection;
@Tags({"hbase", "client"})
@CapabilityDescription("A controller service for accessing an HBase client.")
public interface HBaseClientService extends ControllerService {
PropertyDescriptor HADOOP_CONF_FILES = new PropertyDescriptor.Builder()
.name("Hadoop Configuration Files")
.description("Comma-separated list of Hadoop Configuration files, such as hbase-site.xml")
.required(true)
.defaultValue("./conf/hbase-site.xml")
.addValidator(new ConfigFilesValidator())
.build();
/**
* Puts a batch of mutations to the given table.
*
* @param tableName the name of an HBase table
* @param puts a list of put mutations for the given table
* @throws IOException thrown when there are communication errors with HBase
*/
void put(String tableName, Collection<PutFlowFile> puts) throws IOException;
/**
* Scans the given table using the optional filter criteria and passing each result to the provided handler.
*
* @param tableName the name of an HBase table to scan
* @param columns optional columns to return, if not specified all columns are returned
* @param filterExpression optional filter expression, if not specified no filtering is performed
* @param minTime the minimum timestamp of cells to return, passed to the HBase scanner timeRange
* @param handler a handler to process rows of the result set
* @throws IOException thrown when there are communication errors with HBase
*/
void scan(String tableName, Collection<Column> columns, String filterExpression, long minTime, ResultHandler handler) throws IOException;
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase.put;
import org.apache.nifi.flowfile.FlowFile;
/**
* Wrapper to encapsulate all of the information for the Put along with the FlowFile.
*/
public class PutFlowFile {
private final String tableName;
private final String row;
private final String columnFamily;
private final String columnQualifier;
private final byte[] buffer;
private final FlowFile flowFile;
public PutFlowFile(String tableName, String row, String columnFamily, String columnQualifier,
byte[] buffer, FlowFile flowFile) {
this.tableName = tableName;
this.row = row;
this.columnFamily = columnFamily;
this.columnQualifier = columnQualifier;
this.buffer = buffer;
this.flowFile = flowFile;
}
public String getTableName() {
return tableName;
}
public String getRow() {
return row;
}
public String getColumnFamily() {
return columnFamily;
}
public String getColumnQualifier() {
return columnQualifier;
}
public byte[] getBuffer() {
return buffer;
}
public FlowFile getFlowFile() {
return flowFile;
}
}

View File

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase.scan;
import java.util.Arrays;
/**
* Wrapper to encapsulate a column family and qualifier.
*/
public class Column {
private final byte[] family;
private final byte[] qualifier;
public Column(byte[] family, byte[] qualifier) {
this.family = family;
this.qualifier = qualifier;
}
public byte[] getFamily() {
return family;
}
public byte[] getQualifier() {
return qualifier;
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof Column)) {
return false;
}
final Column other = (Column) obj;
return ((this.family == null && other.family == null)
|| (this.family != null && other.family != null && Arrays.equals(this.family, other.family)))
&& ((this.qualifier == null && other.qualifier == null)
|| (this.qualifier != null && other.qualifier != null && Arrays.equals(this.qualifier, other.qualifier)));
}
@Override
public int hashCode() {
int result = 37;
if (family != null) {
for (byte b : family) {
result += (int)b;
}
}
if (qualifier != null) {
for (byte b : qualifier) {
result += (int)b;
}
}
return result;
}
}

View File

@ -0,0 +1,188 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase.scan;
public class ResultCell {
byte[] rowArray;
int rowOffset;
short rowLength;
byte[] familyArray;
int familyOffset;
byte familyLength;
byte[] qualifierArray;
int qualifierOffset;
int qualifierLength;
long timestamp;
byte typeByte;
long sequenceId;
byte[] valueArray;
int valueOffset;
int valueLength;
byte[] tagsArray;
int tagsOffset;
int tagsLength;
public byte[] getRowArray() {
return rowArray;
}
public void setRowArray(byte[] rowArray) {
this.rowArray = rowArray;
}
public int getRowOffset() {
return rowOffset;
}
public void setRowOffset(int rowOffset) {
this.rowOffset = rowOffset;
}
public short getRowLength() {
return rowLength;
}
public void setRowLength(short rowLength) {
this.rowLength = rowLength;
}
public byte[] getFamilyArray() {
return familyArray;
}
public void setFamilyArray(byte[] familyArray) {
this.familyArray = familyArray;
}
public int getFamilyOffset() {
return familyOffset;
}
public void setFamilyOffset(int familyOffset) {
this.familyOffset = familyOffset;
}
public byte getFamilyLength() {
return familyLength;
}
public void setFamilyLength(byte familyLength) {
this.familyLength = familyLength;
}
public byte[] getQualifierArray() {
return qualifierArray;
}
public void setQualifierArray(byte[] qualifierArray) {
this.qualifierArray = qualifierArray;
}
public int getQualifierOffset() {
return qualifierOffset;
}
public void setQualifierOffset(int qualifierOffset) {
this.qualifierOffset = qualifierOffset;
}
public int getQualifierLength() {
return qualifierLength;
}
public void setQualifierLength(int qualifierLength) {
this.qualifierLength = qualifierLength;
}
public long getTimestamp() {
return timestamp;
}
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
public byte getTypeByte() {
return typeByte;
}
public void setTypeByte(byte typeByte) {
this.typeByte = typeByte;
}
public long getSequenceId() {
return sequenceId;
}
public void setSequenceId(long sequenceId) {
this.sequenceId = sequenceId;
}
public byte[] getValueArray() {
return valueArray;
}
public void setValueArray(byte[] valueArray) {
this.valueArray = valueArray;
}
public int getValueOffset() {
return valueOffset;
}
public void setValueOffset(int valueOffset) {
this.valueOffset = valueOffset;
}
public int getValueLength() {
return valueLength;
}
public void setValueLength(int valueLength) {
this.valueLength = valueLength;
}
public byte[] getTagsArray() {
return tagsArray;
}
public void setTagsArray(byte[] tagsArray) {
this.tagsArray = tagsArray;
}
public int getTagsOffset() {
return tagsOffset;
}
public void setTagsOffset(int tagsOffset) {
this.tagsOffset = tagsOffset;
}
public int getTagsLength() {
return tagsLength;
}
public void setTagsLength(int tagsLength) {
this.tagsLength = tagsLength;
}
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase.scan;
/**
* Handles a single row from an HBase scan.
*/
public interface ResultHandler {
void handle(byte[] row, ResultCell[] resultCells);
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase.validate;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.processor.util.StandardValidators;
public class ConfigFilesValidator implements Validator {
@Override
public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
final String[] filenames = value.split(",");
for (final String filename : filenames) {
final ValidationResult result = StandardValidators.FILE_EXISTS_VALIDATOR.validate(subject, filename.trim(), context);
if (!result.isValid()) {
return result;
}
}
return new ValidationResult.Builder().subject(subject).input(value).valid(true).build();
}
}

View File

@ -0,0 +1,42 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hbase_1_1_2-client-service-bundle</artifactId>
<version>0.4.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-hbase_1_1_2-client-service-nar</artifactId>
<version>0.4.0-SNAPSHOT</version>
<packaging>nar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hbase_1_1_2-client-service</artifactId>
<version>0.4.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,357 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
APACHE NIFI SUBCOMPONENTS:
The Apache NiFi project contains subcomponents with separate copyright
notices and license terms. Your use of the source code for the these
subcomponents is subject to the terms and conditions of the following
licenses.
The binary distribution of this product bundles 'Jcodings' under an MIT style
license.
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
of the Software, and to permit persons to whom the Software is furnished to do
so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
The binary distribution of this product bundles 'Joni' under an MIT style
license.
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
of the Software, and to permit persons to whom the Software is furnished to do
so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
The binary distribution of this product bundles 'Google Protocol Buffers Java 2.5.0'
which is licensed under a BSD license.
This license applies to all parts of Protocol Buffers except the following:
- Atomicops support for generic gcc, located in
src/google/protobuf/stubs/atomicops_internals_generic_gcc.h.
This file is copyrighted by Red Hat Inc.
- Atomicops support for AIX/POWER, located in
src/google/protobuf/stubs/atomicops_internals_aix.h.
This file is copyrighted by Bloomberg Finance LP.
Copyright 2014, Google Inc. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
* Neither the name of Google Inc. nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
Code generated by the Protocol Buffer compiler is owned by the owner
of the input file used when generating it. This code is not
standalone and requires a support library to be linked with it. This
support library is itself covered by the above license.
The binary distribution of this product bundles 'Paranamer Core' which is available
under a BSD style license.
Copyright (c) 2006 Paul Hammant & ThoughtWorks Inc
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
3. Neither the name of the copyright holders nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
THE POSSIBILITY OF SUCH DAMAGE.
The binary distribution of this product bundles 'JCraft Jsch' which is available
under a BSD style license.
Copyright (c) 2002-2014 Atsuhiko Yamanaka, JCraft,Inc.
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the distribution.
3. The names of the authors may not be used to endorse or promote products
derived from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES,
INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL JCRAFT,
INC. OR ANY CONTRIBUTORS TO THIS SOFTWARE BE LIABLE FOR ANY DIRECT, INDIRECT,
INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@ -0,0 +1,334 @@
nifi-hbase_1_1_2-client-service-nar
Copyright 2014-2015 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
******************
Apache Software License v2
******************
(ASLv2) Apache Commons CLI
The following NOTICE information applies:
Apache Commons CLI
Copyright 2001-2009 The Apache Software Foundation
(ASLv2) Apache Curator
The following NOTICE information applies:
Curator Framework
Copyright 2011-2014 The Apache Software Foundation
Curator Client
Copyright 2011-2014 The Apache Software Foundation
Curator Recipes
Copyright 2011-2014 The Apache Software Foundation
(ASLv2) Apache Directory Server
The following NOTICE information applies:
ApacheDS Protocol Kerberos Codec
Copyright 2003-2013 The Apache Software Foundation
ApacheDS I18n
Copyright 2003-2013 The Apache Software Foundation
Apache Directory API ASN.1 API
Copyright 2003-2013 The Apache Software Foundation
Apache Directory LDAP API Utilities
Copyright 2003-2013 The Apache Software Foundation
(ASLv2) Apache Commons Math
The following NOTICE information applies:
Apache Commons Math
Copyright 2001-2012 The Apache Software Foundation
This product includes software developed by
The Apache Software Foundation (http://www.apache.org/).
===============================================================================
The BracketFinder (package org.apache.commons.math3.optimization.univariate)
and PowellOptimizer (package org.apache.commons.math3.optimization.general)
classes are based on the Python code in module "optimize.py" (version 0.5)
developed by Travis E. Oliphant for the SciPy library (http://www.scipy.org/)
Copyright © 2003-2009 SciPy Developers.
===============================================================================
The LinearConstraint, LinearObjectiveFunction, LinearOptimizer,
RelationShip, SimplexSolver and SimplexTableau classes in package
org.apache.commons.math3.optimization.linear include software developed by
Benjamin McCann (http://www.benmccann.com) and distributed with
the following copyright: Copyright 2009 Google Inc.
===============================================================================
This product includes software developed by the
University of Chicago, as Operator of Argonne National
Laboratory.
The LevenbergMarquardtOptimizer class in package
org.apache.commons.math3.optimization.general includes software
translated from the lmder, lmpar and qrsolv Fortran routines
from the Minpack package
Minpack Copyright Notice (1999) University of Chicago. All rights reserved
===============================================================================
The GraggBulirschStoerIntegrator class in package
org.apache.commons.math3.ode.nonstiff includes software translated
from the odex Fortran routine developed by E. Hairer and G. Wanner.
Original source copyright:
Copyright (c) 2004, Ernst Hairer
===============================================================================
The EigenDecompositionImpl class in package
org.apache.commons.math3.linear includes software translated
from some LAPACK Fortran routines. Original source copyright:
Copyright (c) 1992-2008 The University of Tennessee. All rights reserved.
===============================================================================
The MersenneTwister class in package org.apache.commons.math3.random
includes software translated from the 2002-01-26 version of
the Mersenne-Twister generator written in C by Makoto Matsumoto and Takuji
Nishimura. Original source copyright:
Copyright (C) 1997 - 2002, Makoto Matsumoto and Takuji Nishimura,
All rights reserved
===============================================================================
The LocalizedFormatsTest class in the unit tests is an adapted version of
the OrekitMessagesTest class from the orekit library distributed under the
terms of the Apache 2 licence. Original source copyright:
Copyright 2010 CS Systèmes d'Information
===============================================================================
The HermiteInterpolator class and its corresponding test have been imported from
the orekit library distributed under the terms of the Apache 2 licence. Original
source copyright:
Copyright 2010-2012 CS Systèmes d'Information
===============================================================================
The creation of the package "o.a.c.m.analysis.integration.gauss" was inspired
by an original code donated by Sébastien Brisard.
===============================================================================
(ASLv2) Apache Jakarta HttpClient
The following NOTICE information applies:
Apache Jakarta HttpClient
Copyright 1999-2007 The Apache Software Foundation
(ASLv2) Apache Commons Codec
The following NOTICE information applies:
Apache Commons Codec
Copyright 2002-2014 The Apache Software Foundation
src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java
contains test data from http://aspell.net/test/orig/batch0.tab.
Copyright (C) 2002 Kevin Atkinson (kevina@gnu.org)
===============================================================================
The content of package org.apache.commons.codec.language.bm has been translated
from the original php source code available at http://stevemorse.org/phoneticinfo.htm
with permission from the original authors.
Original source copyright:
Copyright (c) 2008 Alexander Beider & Stephen P. Morse.
(ASLv2) Apache Commons IO
The following NOTICE information applies:
Apache Commons IO
Copyright 2002-2012 The Apache Software Foundation
(ASLv2) Apache Commons Net
The following NOTICE information applies:
Apache Commons Net
Copyright 2001-2013 The Apache Software Foundation
(ASLv2) Apache Commons Collections
The following NOTICE information applies:
Apache Commons Collections
Copyright 2001-2008 The Apache Software Foundation
(ASLv2) Jettison
The following NOTICE information applies:
Copyright 2006 Envoi Solutions LLC
(ASLv2) Apache Commons Logging
The following NOTICE information applies:
Apache Commons Logging
Copyright 2003-2013 The Apache Software Foundation
(ASLv2) Apache Commons Lang
The following NOTICE information applies:
Apache Commons Lang
Copyright 2001-2011 The Apache Software Foundation
(ASLv2) Apache log4j
The following NOTICE information applies:
Apache log4j
Copyright 2007 The Apache Software Foundation
(ASLv2) Apache HttpComponents
The following NOTICE information applies:
Apache HttpClient
Copyright 1999-2015 The Apache Software Foundation
Apache HttpComponents HttpCore
Copyright 2005-2011 The Apache Software Foundation
(ASLv2) Apache Commons Configuration
The following NOTICE information applies:
Apache Commons Configuration
Copyright 2001-2008 The Apache Software Foundation
(ASLv2) Apache Jakarta Commons Digester
The following NOTICE information applies:
Apache Jakarta Commons Digester
Copyright 2001-2006 The Apache Software Foundation
(ASLv2) Apache Commons BeanUtils
The following NOTICE information applies:
Apache Commons BeanUtils
Copyright 2000-2008 The Apache Software Foundation
(ASLv2) Apache Avro
The following NOTICE information applies:
Apache Avro
Copyright 2009-2013 The Apache Software Foundation
(ASLv2) Snappy Java
The following NOTICE information applies:
This product includes software developed by Google
Snappy: http://code.google.com/p/snappy/ (New BSD License)
This product includes software developed by Apache
PureJavaCrc32C from apache-hadoop-common http://hadoop.apache.org/
(Apache 2.0 license)
This library containd statically linked libstdc++. This inclusion is allowed by
"GCC RUntime Library Exception"
http://gcc.gnu.org/onlinedocs/libstdc++/manual/license.html
(ASLv2) ApacheDS
The following NOTICE information applies:
ApacheDS
Copyright 2003-2013 The Apache Software Foundation
(ASLv2) Apache ZooKeeper
The following NOTICE information applies:
Apache ZooKeeper
Copyright 2009-2012 The Apache Software Foundation
(ASLv2) Apache Commons Compress
The following NOTICE information applies:
Apache Commons Compress
Copyright 2002-2014 The Apache Software Foundation
The files in the package org.apache.commons.compress.archivers.sevenz
were derived from the LZMA SDK, version 9.20 (C/ and CPP/7zip/),
which has been placed in the public domain:
"LZMA SDK is placed in the public domain." (http://www.7-zip.org/sdk.html)
(ASLv2) Apache Commons Daemon
The following NOTICE information applies:
Apache Commons Daemon
Copyright 1999-2013 The Apache Software Foundation
(ASLv2) The Netty Project
The following NOTICE information applies:
The Netty Project
Copyright 2011 The Netty Project
(ASLv2) Apache Xerces Java
The following NOTICE information applies:
Apache Xerces Java
Copyright 1999-2007 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
Portions of this software were originally based on the following:
- software copyright (c) 1999, IBM Corporation., http://www.ibm.com.
- software copyright (c) 1999, Sun Microsystems., http://www.sun.com.
- voluntary contributions made by Paul Eng on behalf of the
Apache Software Foundation that were originally developed at iClick, Inc.,
software copyright (c) 1999.
(ASLv2) Google Guice
The following NOTICE information applies:
Google Guice - Core Library
Copyright 2006-2011 Google, Inc.
Google Guice - Extensions - Servlet
Copyright 2006-2011 Google, Inc.
(ASLv2) HBase Common
The following NOTICE information applies:
This product includes portions of the Guava project v14, specifically
'hbase-common/src/main/java/org/apache/hadoop/hbase/io/LimitInputStream.java'
Copyright (C) 2007 The Guava Authors
Licensed under the Apache License, Version 2.0
(ASLv2) HTrace Core
The following NOTICE information applies:
In addition, this product includes software dependencies. See
the accompanying LICENSE.txt for a listing of dependencies
that are NOT Apache licensed (with pointers to their licensing)
Apache HTrace includes an Apache Thrift connector to Zipkin. Zipkin
is a distributed tracing system that is Apache 2.0 Licensed.
Copyright 2012 Twitter, Inc.
(ASLv2) Jackson Core ASL
The following NOTICE information applies:
This product currently only contains code developed by authors
of specific components, as identified by the source code files;
if such notes are missing files have been created by
Tatu Saloranta.
For additional credits (generally to people who reported problems)
see CREDITS file.
(ASLv2) Jackson Mapper ASL
The following NOTICE information applies:
This product currently only contains code developed by authors
of specific components, as identified by the source code files;
if such notes are missing files have been created by
Tatu Saloranta.
For additional credits (generally to people who reported problems)
see CREDITS file.
************************
Common Development and Distribution License 1.1
************************
The following binary components are provided under the Common Development and Distribution License 1.1. See project link for details.
(CDDL 1.1) (GPL2 w/ CPE) jersey-server (com.sun.jersey:jersey-server:jar:1.19 - https://jersey.java.net/jersey-server/)
(CDDL 1.1) (GPL2 w/ CPE) jersey-client (com.sun.jersey:jersey-client:jar:1.19 - https://jersey.java.net/jersey-client/)
(CDDL 1.1) (GPL2 w/ CPE) jersey-core (com.sun.jersey:jersey-core:jar:1.19 - https://jersey.java.net/jersey-core/)
(CDDL 1.1) (GPL2 w/ CPE) jersey-json (com.sun.jersey:jersey-json:jar:1.19 - https://jersey.java.net/index.html)
(CDDL 1.1) (GPL2 w/ CPE) jersey-juice (com.sun.jersey:jersey-juice:jar:1.9 - https://jersey.java.net/index.html)
(CDDL 1.1) (GPL2 w/ CPE) Old JAXB Runtime (com.sun.xml.bind:jaxb-impl:jar:2.2.3-1 - http://jaxb.java.net/)
(CDDL 1.1) (GPL2 w/ CPE) Java Architecture For XML Binding (javax.xml.bind:jaxb-api:jar:2.2.2 - https://jaxb.dev.java.net/)
************************
Common Development and Distribution License 1.0
************************
The following binary components are provided under the Common Development and Distribution License 1.0. See project link for details.
(CDDL 1.0) JavaServlet(TM) Specification (javax.servlet:servlet-api:jar:2.5 - no url available)
(CDDL 1.0) (GPL3) Streaming API For XML (javax.xml.stream:stax-api:jar:1.0-2 - no url provided)
(CDDL 1.0) JavaBeans Activation Framework (JAF) (javax.activation:activation:jar:1.1 - http://java.sun.com/products/javabeans/jaf/index.jsp)
(CDDL 1.0) JavaServer Pages(TM) API (javax.servlet.jsp:jsp-api:jar:2.1 - http://jsp.java.net)
(CDDL 1.0) JSR311 API (javax.ws.rs:jsr311-api:jar:1.1.1 - https://jsr311.dev.java.net)
*****************
Public Domain
*****************
The following binary components are provided to the 'Public Domain'. See project link for details.
(Public Domain) AOP Alliance 1.0 (http://aopalliance.sourceforge.net/)

View File

@ -0,0 +1,78 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hbase_1_1_2-client-service-bundle</artifactId>
<version>0.4.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-hbase_1_1_2-client-service</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hbase-client-service-api</artifactId>
<version>0.4.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.4</version>
</dependency>
<!-- test dependencies -->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,207 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.ParseFilter;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.hbase.put.PutFlowFile;
import org.apache.nifi.hbase.scan.Column;
import org.apache.nifi.hbase.scan.ResultCell;
import org.apache.nifi.hbase.scan.ResultHandler;
import org.apache.nifi.reporting.InitializationException;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Tags({ "hbase", "client"})
@CapabilityDescription("Implementation of HBaseClientService for HBase 1.1.2.")
public class HBase_1_1_2_ClientService extends AbstractControllerService implements HBaseClientService {
private volatile Connection connection;
private List<PropertyDescriptor> properties;
@Override
protected void init(ControllerServiceInitializationContext config) throws InitializationException {
List<PropertyDescriptor> props = new ArrayList<>();
props.add(HADOOP_CONF_FILES);
this.properties = Collections.unmodifiableList(props);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@OnEnabled
public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException {
this.connection = createConnection(context);
}
protected Connection createConnection(final ConfigurationContext context) throws IOException {
final Configuration hbaseConfig = HBaseConfiguration.create();
for (final String configFile : context.getProperty(HADOOP_CONF_FILES).getValue().split(",")) {
hbaseConfig.addResource(new Path(configFile.trim()));
}
return ConnectionFactory.createConnection(hbaseConfig);
}
@OnDisabled
public void shutdown() {
if (connection != null) {
try {
connection.close();
} catch (final IOException ioe) {
getLogger().warn("Failed to close connection to HBase due to {}", new Object[]{ioe});
}
}
}
@Override
public void put(final String tableName, final Collection<PutFlowFile> puts) throws IOException {
try (final Table table = connection.getTable(TableName.valueOf(tableName))) {
// Create one Put per row....
final Map<String, Put> rowPuts = new HashMap<>();
for (final PutFlowFile putFlowFile : puts) {
Put put = rowPuts.get(putFlowFile.getRow());
if (put == null) {
put = new Put(putFlowFile.getRow().getBytes(StandardCharsets.UTF_8));
rowPuts.put(putFlowFile.getRow(), put);
}
put.addColumn(putFlowFile.getColumnFamily().getBytes(StandardCharsets.UTF_8),
putFlowFile.getColumnQualifier().getBytes(StandardCharsets.UTF_8),
putFlowFile.getBuffer());
}
table.put(new ArrayList<>(rowPuts.values()));
}
}
@Override
public void scan(final String tableName, final Collection<Column> columns, final String filterExpression, final long minTime, final ResultHandler handler)
throws IOException {
Filter filter = null;
if (!StringUtils.isBlank(filterExpression)) {
ParseFilter parseFilter = new ParseFilter();
filter = parseFilter.parseFilterString(filterExpression);
}
try (final Table table = connection.getTable(TableName.valueOf(tableName));
final ResultScanner scanner = getResults(table, columns, filter, minTime)) {
for (final Result result : scanner) {
final byte[] rowKey = result.getRow();
final Cell[] cells = result.rawCells();
if (cells == null) {
continue;
}
// convert HBase cells to NiFi cells
final ResultCell[] resultCells = new ResultCell[cells.length];
for (int i=0; i < cells.length; i++) {
final Cell cell = cells[i];
final ResultCell resultCell = new ResultCell();
resultCell.setRowArray(cell.getRowArray());
resultCell.setRowOffset(cell.getRowOffset());
resultCell.setRowLength(cell.getRowLength());
resultCell.setFamilyArray(cell.getFamilyArray());
resultCell.setFamilyOffset(cell.getFamilyOffset());
resultCell.setFamilyLength(cell.getFamilyLength());
resultCell.setQualifierArray(cell.getQualifierArray());
resultCell.setQualifierOffset(cell.getQualifierOffset());
resultCell.setQualifierLength(cell.getQualifierLength());
resultCell.setTimestamp(cell.getTimestamp());
resultCell.setTypeByte(cell.getTypeByte());
resultCell.setSequenceId(cell.getSequenceId());
resultCell.setValueArray(cell.getValueArray());
resultCell.setValueOffset(cell.getValueOffset());
resultCell.setValueLength(cell.getValueLength());
resultCell.setTagsArray(cell.getTagsArray());
resultCell.setTagsOffset(cell.getTagsOffset());
resultCell.setTagsLength(cell.getTagsLength());
resultCells[i] = resultCell;
}
// delegate to the handler
handler.handle(rowKey, resultCells);
}
}
}
// protected and extracted into separate method for testing
protected ResultScanner getResults(final Table table, final Collection<Column> columns, final Filter filter, final long minTime) throws IOException {
// Create a new scan. We will set the min timerange as the latest timestamp that
// we have seen so far. The minimum timestamp is inclusive, so we will get duplicates.
// We will record any cells that have the latest timestamp, so that when we scan again,
// we know to throw away those duplicates.
final Scan scan = new Scan();
scan.setTimeRange(minTime, Long.MAX_VALUE);
if (filter != null) {
scan.setFilter(filter);
}
if (columns != null) {
for (Column col : columns) {
if (col.getQualifier() == null) {
scan.addFamily(col.getFamily());
} else {
scan.addColumn(col.getFamily(), col.getQualifier());
}
}
}
return table.getScanner(scan);
}
}

View File

@ -0,0 +1,15 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.hbase.HBase_1_1_2_ClientService

View File

@ -0,0 +1,380 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.hbase.put.PutFlowFile;
import org.apache.nifi.hbase.scan.Column;
import org.apache.nifi.hbase.scan.ResultCell;
import org.apache.nifi.hbase.scan.ResultHandler;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class TestHBase_1_1_2_ClientService {
@Test
public void testSinglePut() throws InitializationException, IOException {
final String tableName = "nifi";
final String row = "row1";
final String columnFamily = "family1";
final String columnQualifier = "qualifier1";
final String content = "content1";
final PutFlowFile putFlowFile = new PutFlowFile(tableName, row, columnFamily, columnQualifier,
content.getBytes(StandardCharsets.UTF_8), null);
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
// Mock an HBase Table so we can verify the put operations later
final Table table = Mockito.mock(Table.class);
when(table.getName()).thenReturn(TableName.valueOf(tableName));
// create the controller service and link it to the test processor
final HBaseClientService service = configureHBaseClientService(runner, table);
runner.assertValid(service);
// try to put a single cell
final HBaseClientService hBaseClientService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE)
.asControllerService(HBaseClientService.class);
hBaseClientService.put(tableName, Arrays.asList(putFlowFile));
// verify only one call to put was made
ArgumentCaptor<List> capture = ArgumentCaptor.forClass(List.class);
verify(table, times(1)).put(capture.capture());
// verify only one put was in the list of puts
final List<Put> puts = capture.getValue();
assertEquals(1, puts.size());
verifyPut(row, columnFamily, columnQualifier, content, puts.get(0));
}
@Test
public void testMultiplePutsSameRow() throws IOException, InitializationException {
final String tableName = "nifi";
final String row = "row1";
final String columnFamily = "family1";
final String columnQualifier = "qualifier1";
final String content1 = "content1";
final String content2 = "content2";
final PutFlowFile putFlowFile1 = new PutFlowFile(tableName, row, columnFamily, columnQualifier,
content1.getBytes(StandardCharsets.UTF_8), null);
final PutFlowFile putFlowFile2 = new PutFlowFile(tableName, row, columnFamily, columnQualifier,
content2.getBytes(StandardCharsets.UTF_8), null);
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
// Mock an HBase Table so we can verify the put operations later
final Table table = Mockito.mock(Table.class);
when(table.getName()).thenReturn(TableName.valueOf(tableName));
// create the controller service and link it to the test processor
final HBaseClientService service = configureHBaseClientService(runner, table);
runner.assertValid(service);
// try to put a multiple cells for the same row
final HBaseClientService hBaseClientService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE)
.asControllerService(HBaseClientService.class);
hBaseClientService.put(tableName, Arrays.asList(putFlowFile1, putFlowFile2));
// verify put was only called once
ArgumentCaptor<List> capture = ArgumentCaptor.forClass(List.class);
verify(table, times(1)).put(capture.capture());
// verify there was only one put in the list of puts
final List<Put> puts = capture.getValue();
assertEquals(1, puts.size());
// verify two cells were added to this one put operation
final NavigableMap<byte[], List<Cell>> familyCells = puts.get(0).getFamilyCellMap();
Map.Entry<byte[], List<Cell>> entry = familyCells.firstEntry();
assertEquals(2, entry.getValue().size());
}
@Test
public void testMultiplePutsDifferentRow() throws IOException, InitializationException {
final String tableName = "nifi";
final String row1 = "row1";
final String row2 = "row2";
final String columnFamily = "family1";
final String columnQualifier = "qualifier1";
final String content1 = "content1";
final String content2 = "content2";
final PutFlowFile putFlowFile1 = new PutFlowFile(tableName, row1, columnFamily, columnQualifier,
content1.getBytes(StandardCharsets.UTF_8), null);
final PutFlowFile putFlowFile2 = new PutFlowFile(tableName, row2, columnFamily, columnQualifier,
content2.getBytes(StandardCharsets.UTF_8), null);
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
// Mock an HBase Table so we can verify the put operations later
final Table table = Mockito.mock(Table.class);
when(table.getName()).thenReturn(TableName.valueOf(tableName));
// create the controller service and link it to the test processor
final HBaseClientService service = configureHBaseClientService(runner, table);
runner.assertValid(service);
// try to put a multiple cells with different rows
final HBaseClientService hBaseClientService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE)
.asControllerService(HBaseClientService.class);
hBaseClientService.put(tableName, Arrays.asList(putFlowFile1, putFlowFile2));
// verify put was only called once
ArgumentCaptor<List> capture = ArgumentCaptor.forClass(List.class);
verify(table, times(1)).put(capture.capture());
// verify there were two puts in the list
final List<Put> puts = capture.getValue();
assertEquals(2, puts.size());
}
@Test
public void testScan() throws InitializationException, IOException {
final String tableName = "nifi";
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
// Mock an HBase Table so we can verify the put operations later
final Table table = Mockito.mock(Table.class);
when(table.getName()).thenReturn(TableName.valueOf(tableName));
// create the controller service and link it to the test processor
final MockHBaseClientService service = configureHBaseClientService(runner, table);
runner.assertValid(service);
// stage some results in the mock service...
final long now = System.currentTimeMillis();
final Map<String, String> cells = new HashMap<>();
cells.put("greeting", "hello");
cells.put("name", "nifi");
service.addResult("row0", cells, now - 2);
service.addResult("row1", cells, now - 1);
service.addResult("row2", cells, now - 1);
service.addResult("row3", cells, now);
// perform a scan and verify the four rows were returned
final CollectingResultHandler handler = new CollectingResultHandler();
final HBaseClientService hBaseClientService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE)
.asControllerService(HBaseClientService.class);
hBaseClientService.scan(tableName, new ArrayList<Column>(), null, now, handler);
assertEquals(4, handler.results.size());
// get row0 using the row id and verify it has 2 cells
final ResultCell[] results = handler.results.get("row0");
assertNotNull(results);
assertEquals(2, results.length);
verifyResultCell(results[0], "nifi", "greeting", "hello");
verifyResultCell(results[1], "nifi", "name", "nifi");
}
@Test
public void testScanWithValidFilter() throws InitializationException, IOException {
final String tableName = "nifi";
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
// Mock an HBase Table so we can verify the put operations later
final Table table = Mockito.mock(Table.class);
when(table.getName()).thenReturn(TableName.valueOf(tableName));
// create the controller service and link it to the test processor
final MockHBaseClientService service = configureHBaseClientService(runner, table);
runner.assertValid(service);
// perform a scan and verify the four rows were returned
final CollectingResultHandler handler = new CollectingResultHandler();
final HBaseClientService hBaseClientService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE)
.asControllerService(HBaseClientService.class);
// make sure we parse the filter expression without throwing an exception
final String filter = "PrefixFilter ('Row') AND PageFilter (1) AND FirstKeyOnlyFilter ()";
hBaseClientService.scan(tableName, new ArrayList<Column>(), filter, System.currentTimeMillis(), handler);
}
@Test(expected = IllegalArgumentException.class)
public void testScanWithInvalidFilter() throws InitializationException, IOException {
final String tableName = "nifi";
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
// Mock an HBase Table so we can verify the put operations later
final Table table = Mockito.mock(Table.class);
when(table.getName()).thenReturn(TableName.valueOf(tableName));
// create the controller service and link it to the test processor
final MockHBaseClientService service = configureHBaseClientService(runner, table);
runner.assertValid(service);
// perform a scan and verify the four rows were returned
final CollectingResultHandler handler = new CollectingResultHandler();
final HBaseClientService hBaseClientService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE)
.asControllerService(HBaseClientService.class);
// this should throw IllegalArgumentException
final String filter = "this is not a filter";
hBaseClientService.scan(tableName, new ArrayList<Column>(), filter, System.currentTimeMillis(), handler);
}
private MockHBaseClientService configureHBaseClientService(final TestRunner runner, final Table table) throws InitializationException {
final MockHBaseClientService service = new MockHBaseClientService(table);
runner.addControllerService("hbaseClient", service);
runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/core-site.xml");
runner.enableControllerService(service);
runner.setProperty(TestProcessor.HBASE_CLIENT_SERVICE, "hbaseClient");
return service;
}
private void verifyResultCell(final ResultCell result, final String cf, final String cq, final String val) {
final String colFamily = new String(result.getFamilyArray(), result.getFamilyOffset(), result.getFamilyLength());
assertEquals(cf, colFamily);
final String colQualifier = new String(result.getQualifierArray(), result.getQualifierOffset(), result.getQualifierLength());
assertEquals(cq, colQualifier);
final String value = new String(result.getValueArray(), result.getValueOffset(), result.getValueLength());
assertEquals(val, value);
}
private void verifyPut(String row, String columnFamily, String columnQualifier, String content, Put put) {
assertEquals(row, new String(put.getRow()));
NavigableMap<byte [], List<Cell>> familyCells = put.getFamilyCellMap();
assertEquals(1, familyCells.size());
Map.Entry<byte[], List<Cell>> entry = familyCells.firstEntry();
assertEquals(columnFamily, new String(entry.getKey()));
assertEquals(1, entry.getValue().size());
Cell cell = entry.getValue().get(0);
assertEquals(columnQualifier, new String(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()));
assertEquals(content, new String(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
}
// Override methods to create a mock service that can return staged data
private class MockHBaseClientService extends HBase_1_1_2_ClientService {
private Table table;
private List<Result> results = new ArrayList<>();
public MockHBaseClientService(final Table table) {
this.table = table;
}
public void addResult(final String rowKey, final Map<String, String> cells, final long timestamp) {
final byte[] rowArray = rowKey.getBytes(StandardCharsets.UTF_8);
final Cell[] cellArray = new Cell[cells.size()];
int i = 0;
for (final Map.Entry<String, String> cellEntry : cells.entrySet()) {
final Cell cell = Mockito.mock(Cell.class);
when(cell.getRowArray()).thenReturn(rowArray);
when(cell.getRowOffset()).thenReturn(0);
when(cell.getRowLength()).thenReturn((short) rowArray.length);
final String cellValue = cellEntry.getValue();
final byte[] valueArray = cellValue.getBytes(StandardCharsets.UTF_8);
when(cell.getValueArray()).thenReturn(valueArray);
when(cell.getValueOffset()).thenReturn(0);
when(cell.getValueLength()).thenReturn(valueArray.length);
final byte[] familyArray = "nifi".getBytes(StandardCharsets.UTF_8);
when(cell.getFamilyArray()).thenReturn(familyArray);
when(cell.getFamilyOffset()).thenReturn(0);
when(cell.getFamilyLength()).thenReturn((byte) familyArray.length);
final String qualifier = cellEntry.getKey();
final byte[] qualifierArray = qualifier.getBytes(StandardCharsets.UTF_8);
when(cell.getQualifierArray()).thenReturn(qualifierArray);
when(cell.getQualifierOffset()).thenReturn(0);
when(cell.getQualifierLength()).thenReturn(qualifierArray.length);
when(cell.getTimestamp()).thenReturn(timestamp);
cellArray[i++] = cell;
}
final Result result = Mockito.mock(Result.class);
when(result.getRow()).thenReturn(rowArray);
when(result.rawCells()).thenReturn(cellArray);
results.add(result);
}
@Override
protected ResultScanner getResults(Table table, Collection<Column> columns, Filter filter, long minTime) throws IOException {
final ResultScanner scanner = Mockito.mock(ResultScanner.class);
Mockito.when(scanner.iterator()).thenReturn(results.iterator());
return scanner;
}
@Override
protected Connection createConnection(ConfigurationContext context) throws IOException {
Connection connection = Mockito.mock(Connection.class);
Mockito.when(connection.getTable(table.getName())).thenReturn(table);
return connection;
}
}
// handler that saves results for verification
private static final class CollectingResultHandler implements ResultHandler {
Map<String,ResultCell[]> results = new LinkedHashMap<>();
@Override
public void handle(byte[] row, ResultCell[] resultCells) {
final String rowStr = new String(row, StandardCharsets.UTF_8);
results.put(rowStr, resultCells);
}
}
}

View File

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.hbase;
import java.util.ArrayList;
import java.util.List;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.hbase.HBaseClientService;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
public class TestProcessor extends AbstractProcessor {
static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder()
.name("HBase Client Service")
.description("HBaseClientService")
.identifiesControllerService(HBaseClientService.class)
.required(true)
.build();
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
List<PropertyDescriptor> propDescs = new ArrayList<>();
propDescs.add(HBASE_CLIENT_SERVICE);
return propDescs;
}
}

View File

@ -0,0 +1,22 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://hbase</value>
</property>
</configuration>

View File

@ -0,0 +1,50 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services</artifactId>
<version>0.4.0-SNAPSHOT</version>
</parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hbase_1_1_2-client-service-bundle</artifactId>
<version>0.4.0-SNAPSHOT</version>
<packaging>pom</packaging>
<modules>
<module>nifi-hbase_1_1_2-client-service</module>
<module>nifi-hbase_1_1_2-client-service-nar</module>
</modules>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.1.2</version>
</dependency>
<!-- the top-level pom forces 18.0, but Hadoop 2.6 expects 12.0.1 -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${hadoop.guava.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>

View File

@ -47,5 +47,10 @@
<artifactId>nifi-dbcp-service-api</artifactId> <artifactId>nifi-dbcp-service-api</artifactId>
<scope>compile</scope> <scope>compile</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hbase-client-service-api</artifactId>
<scope>compile</scope>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -33,5 +33,7 @@
<module>nifi-standard-services-api-nar</module> <module>nifi-standard-services-api-nar</module>
<module>nifi-dbcp-service-api</module> <module>nifi-dbcp-service-api</module>
<module>nifi-dbcp-service-bundle</module> <module>nifi-dbcp-service-bundle</module>
<module>nifi-hbase-client-service-api</module>
<module>nifi-hbase_1_1_2-client-service-bundle</module>
</modules> </modules>
</project> </project>

View File

@ -42,6 +42,7 @@
<module>nifi-language-translation-bundle</module> <module>nifi-language-translation-bundle</module>
<module>nifi-mongodb-bundle</module> <module>nifi-mongodb-bundle</module>
<module>nifi-flume-bundle</module> <module>nifi-flume-bundle</module>
<module>nifi-hbase-bundle</module>
<module>nifi-ambari-bundle</module> <module>nifi-ambari-bundle</module>
<module>nifi-image-bundle</module> <module>nifi-image-bundle</module>
<module>nifi-avro-bundle</module> <module>nifi-avro-bundle</module>

18
pom.xml
View File

@ -94,6 +94,7 @@
<spring.security.version>3.2.7.RELEASE</spring.security.version> <spring.security.version>3.2.7.RELEASE</spring.security.version>
<jersey.version>1.19</jersey.version> <jersey.version>1.19</jersey.version>
<hadoop.version>2.6.2</hadoop.version> <hadoop.version>2.6.2</hadoop.version>
<hadoop.guava.version>12.0.1</hadoop.guava.version>
<yammer.metrics.version>2.2.0</yammer.metrics.version> <yammer.metrics.version>2.2.0</yammer.metrics.version>
</properties> </properties>
<dependencyManagement> <dependencyManagement>
@ -912,6 +913,18 @@
<version>0.4.0-SNAPSHOT</version> <version>0.4.0-SNAPSHOT</version>
<type>nar</type> <type>nar</type>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hbase_1_1_2-client-service-nar</artifactId>
<version>0.4.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hbase-nar</artifactId>
<version>0.4.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-properties</artifactId> <artifactId>nifi-properties</artifactId>
@ -958,6 +971,11 @@
<artifactId>nifi-dbcp-service-api</artifactId> <artifactId>nifi-dbcp-service-api</artifactId>
<version>0.4.0-SNAPSHOT</version> <version>0.4.0-SNAPSHOT</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hbase-client-service-api</artifactId>
<version>0.4.0-SNAPSHOT</version>
</dependency>
<dependency> <dependency>
<groupId>com.jayway.jsonpath</groupId> <groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId> <artifactId>json-path</artifactId>