NIFI-259: Bug fixes, unit tests, documentation updates

This commit is contained in:
Mark Payne 2016-01-20 10:16:14 -05:00
parent 7ef8af9eea
commit 593f1288d8
4 changed files with 75 additions and 17 deletions

View File

@ -485,8 +485,8 @@ mechanisms for accomplishing this. The first mechanism is to provide authenticat
The second option is to use a user name and password. This is configured by specifying a value for the `Username` and a value for the `Password` properties
for the `ZooKeeperStateProvider` (see the <<state_providers>> section for more information). The important thing to keep in mind here, though, is that ZooKeeper
will pass around the password in plain text. This means that using a user name and password should not be used unless ZooKeeper is running on localhost as a
one-instance cluster, or if communications with ZooKeeper occur only over encrypted communications, such as an SSL connection. More information about connecting
to ZooKeeper via SSL can be found in the <<zk_ssl_client>> section.
one-instance cluster, or if communications with ZooKeeper occur only over encrypted communications, such as a VPN or an SSL connection. ZooKeeper will be
providing support for SSL connections in version 0.5.0.
@ -494,15 +494,15 @@ to ZooKeeper via SSL can be found in the <<zk_ssl_client>> section.
=== Securing ZooKeeper
When NiFi communicates with ZooKeeper, all communications, by default, are non-secure, and anyone who logs into ZooKeeper is able to view and manipulate all
of the NiFi state that is stored in ZooKeeper. We have two options to prevent this from being the case. We can use Kerberos to manage the authentication, or
we can use SSL in order to encrypt communications between NiFi and ZooKeeper.
of the NiFi state that is stored in ZooKeeper. To prevent this, we can use Kerberos to manage the authentication. At this time, ZooKeeper does not provide
support for encryption via SSL. Support for SSL in ZooKeeper is being actively worked and is expected to be available in the 0.5.x version of ZooKeeper.
In order to secure the communications, we need to ensure that both the client and the server support the same configuration. Instructions for configuring the
NiFi ZooKeeper client and embedded ZooKeeper server to use Kerberos and SSL are provided below.
NiFi ZooKeeper client and embedded ZooKeeper server to use Kerberos are provided below.
[[zk_kerberos_client]]
==== Kerberizing NiFi Client
==== Kerberizing NiFi's ZooKeeper Client
The preferred mechanism for authenticating users with ZooKeeper is to use Kerberos. In order to use Kerberos to authenticate, we must configure a few
system properties, so that the ZooKeeper client knows who the user is and where the KeyTab file is.
@ -559,11 +559,6 @@ Now, when we start NiFi, it will use Kerberos to authentication as the `nifi` us
[[zk_ssl_client]]
==== Using SSL to Communicate with ZooKeeper
[[zk_kerberos_server]]
==== Kerberizing Embedded ZooKeeper Server
When using the embedded ZooKeeper server, we may choose to secure the server by using Kerberos. If Kerberos is not already setup in your environment, you can find
@ -635,11 +630,6 @@ Now, we can start NiFi, and the embedded ZooKeeper server will use Kerberos as t
[[zk_ssl_server]]
==== Securing Embedded ZooKeeper with SSL
[[troubleshooting_kerberos]]
==== Troubleshooting Kerberos Configuration
When using Kerberos, it is import to use fully-qualified domain names and not use _localhost_. Please ensure that the fully qualified hostname of each server is used

View File

@ -52,10 +52,12 @@ public class StateMapSerDe implements SerDe<StateMapUpdate> {
for (final Map.Entry<String, String> entry : map.entrySet()) {
final boolean hasKey = entry.getKey() != null;
final boolean hasValue = entry.getValue() != null;
out.writeBoolean(hasKey);
if (hasKey) {
out.writeUTF(entry.getKey());
}
out.writeBoolean(hasValue);
if (hasValue) {
out.writeUTF(entry.getValue());
}

View File

@ -115,7 +115,6 @@ public class ZooKeeperStateProvider extends AbstractStateProvider {
public ZooKeeperStateProvider() throws Exception {
// TODO: Provide SSL Context??
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.state;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.nifi.components.state.StateMap;
import org.junit.Test;
import org.wali.UpdateType;
public class TestStateMapSerDe {
@Test
public void testCreateRoundTrip() throws IOException {
final String componentId = "1234";
final StateMapSerDe serde = new StateMapSerDe();
final Map<String, String> stateValues = new HashMap<>();
stateValues.put("abc", "xyz");
stateValues.put("cba", "zyx");
final StateMap stateMap = new StandardStateMap(stateValues, 3L);
final StateMapUpdate record = new StateMapUpdate(stateMap, componentId, UpdateType.CREATE);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (final DataOutputStream out = new DataOutputStream(baos)) {
serde.serializeRecord(record, out);
}
final StateMapUpdate update;
final ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
try (final DataInputStream in = new DataInputStream(bais)) {
update = serde.deserializeRecord(in, serde.getVersion());
}
assertNotNull(update);
assertEquals(componentId, update.getComponentId());
assertEquals(UpdateType.CREATE, update.getUpdateType());
final StateMap recoveredStateMap = update.getStateMap();
assertEquals(3L, recoveredStateMap.getVersion());
assertEquals(stateValues, recoveredStateMap.toMap());
}
}