mirror of https://github.com/apache/nifi.git
NIFI-10979 Additional nifi-framework upgrades from JUnit 4 to 5
This closes #6806 Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
4716c8d715
commit
b74c5b423f
|
@ -67,8 +67,8 @@ import org.apache.nifi.processor.Relationship;
|
|||
import org.apache.nifi.registry.flow.mapping.FlowMappingOptions;
|
||||
import org.apache.nifi.scheduling.ExecutionNode;
|
||||
import org.apache.nifi.scheduling.SchedulingStrategy;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
@ -138,7 +138,7 @@ public class StandardVersionedComponentSynchronizerTest {
|
|||
private final Set<String> queuesWithData = Collections.synchronizedSet(new HashSet<>());
|
||||
private final Bundle bundle = new Bundle("group", "artifact", "version 1.0");
|
||||
|
||||
@Before
|
||||
@BeforeEach
|
||||
public void setup() {
|
||||
final ExtensionManager extensionManager = Mockito.mock(ExtensionManager.class);
|
||||
final FlowManager flowManager = Mockito.mock(FlowManager.class);
|
||||
|
@ -260,11 +260,9 @@ public class StandardVersionedComponentSynchronizerTest {
|
|||
private void instrumentComponentNodeMethods(final String uuid, final ComponentNode component) {
|
||||
when(component.getIdentifier()).thenReturn(uuid);
|
||||
when(component.getProperties()).thenReturn(Collections.emptyMap());
|
||||
when(component.getPropertyDescriptor(anyString())).thenAnswer(invocation -> {
|
||||
return new PropertyDescriptor.Builder()
|
||||
.name(invocation.getArgument(0, String.class))
|
||||
.build();
|
||||
});
|
||||
when(component.getPropertyDescriptor(anyString())).thenAnswer(invocation -> new PropertyDescriptor.Builder()
|
||||
.name(invocation.getArgument(0, String.class))
|
||||
.build());
|
||||
when(component.getBundleCoordinate()).thenReturn(new BundleCoordinate("group", "artifact", "version 1.0"));
|
||||
}
|
||||
|
||||
|
@ -375,9 +373,7 @@ public class StandardVersionedComponentSynchronizerTest {
|
|||
|
||||
synchronizationOptions = createQuickFailSynchronizationOptions(FlowSynchronizationOptions.ComponentStopTimeoutAction.THROW_TIMEOUT_EXCEPTION);
|
||||
|
||||
assertThrows(TimeoutException.class, () -> {
|
||||
synchronizer.synchronize(processorA, versionedProcessor, group, synchronizationOptions);
|
||||
});
|
||||
assertThrows(TimeoutException.class, () -> synchronizer.synchronize(processorA, versionedProcessor, group, synchronizationOptions));
|
||||
|
||||
verifyStopped(processorA);
|
||||
verifyNotRestarted(processorA);
|
||||
|
@ -466,9 +462,7 @@ public class StandardVersionedComponentSynchronizerTest {
|
|||
|
||||
synchronizationOptions = createQuickFailSynchronizationOptions(FlowSynchronizationOptions.ComponentStopTimeoutAction.THROW_TIMEOUT_EXCEPTION);
|
||||
|
||||
assertThrows(TimeoutException.class, () -> {
|
||||
synchronizer.synchronize(connectionAB, versionedConnection, group, synchronizationOptions);
|
||||
});
|
||||
assertThrows(TimeoutException.class, () -> synchronizer.synchronize(connectionAB, versionedConnection, group, synchronizationOptions));
|
||||
|
||||
// Ensure that we terminate the source
|
||||
verify(processorA, times(0)).terminate();
|
||||
|
|
|
@ -56,7 +56,7 @@ import org.apache.nifi.registry.flow.diff.StandardFlowComparator;
|
|||
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessGroup;
|
||||
import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
|
||||
import org.apache.nifi.util.FlowDifferenceFilters;
|
||||
import org.junit.Test;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
|
|
|
@ -18,17 +18,19 @@ package org.apache.nifi.nar;
|
|||
|
||||
import org.apache.nifi.security.util.TlsException;
|
||||
import org.apache.nifi.util.NiFiProperties;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.junit.MockitoJUnitRunner;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
public class TestPropertyBasedNarProviderInitializationContext {
|
||||
private static final String PROVIDER_NAME = "external";
|
||||
|
||||
|
@ -45,7 +47,7 @@ public class TestPropertyBasedNarProviderInitializationContext {
|
|||
|
||||
// then
|
||||
Mockito.verify(properties, Mockito.times(1)).getPropertiesWithPrefix(PREFIX);
|
||||
Assert.assertTrue(result.isEmpty());
|
||||
assertTrue(result.isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -61,7 +63,7 @@ public class TestPropertyBasedNarProviderInitializationContext {
|
|||
|
||||
// then
|
||||
Mockito.verify(properties, Mockito.times(1)).getPropertiesWithPrefix(PREFIX);
|
||||
Assert.assertTrue(result.isEmpty());
|
||||
assertTrue(result.isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -77,7 +79,7 @@ public class TestPropertyBasedNarProviderInitializationContext {
|
|||
|
||||
// then
|
||||
Mockito.verify(properties, Mockito.times(1)).getPropertiesWithPrefix(PREFIX);
|
||||
Assert.assertTrue(result.isEmpty());
|
||||
assertTrue(result.isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -94,10 +96,10 @@ public class TestPropertyBasedNarProviderInitializationContext {
|
|||
|
||||
// then
|
||||
Mockito.verify(properties, Mockito.times(1)).getPropertiesWithPrefix(PREFIX);
|
||||
Assert.assertEquals(2, result.size());
|
||||
Assert.assertTrue(result.containsKey("key1"));
|
||||
Assert.assertTrue(result.containsKey("key2"));
|
||||
Assert.assertEquals("value1", result.get("key1"));
|
||||
Assert.assertEquals("value2", result.get("key2"));
|
||||
assertEquals(2, result.size());
|
||||
assertTrue(result.containsKey("key1"));
|
||||
assertTrue(result.containsKey("key2"));
|
||||
assertEquals("value1", result.get("key1"));
|
||||
assertEquals("value2", result.get("key2"));
|
||||
}
|
||||
}
|
|
@ -16,19 +16,21 @@
|
|||
*/
|
||||
package org.apache.nifi.nar;
|
||||
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import org.apache.nifi.bundle.Bundle;
|
||||
import org.apache.nifi.processor.AbstractProcessor;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.util.NiFiProperties;
|
||||
import org.junit.Test;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
public class NarThreadContextClassLoaderTest {
|
||||
|
||||
|
@ -39,22 +41,21 @@ public class NarThreadContextClassLoaderTest {
|
|||
ExtensionDiscoveringManager extensionManager = new StandardExtensionDiscoveringManager();
|
||||
extensionManager.discoverExtensions(systemBundle, Collections.emptySet());
|
||||
|
||||
Object obj = NarThreadContextClassLoader.createInstance(extensionManager, WithPropertiesConstructor.class.getName(),
|
||||
WithPropertiesConstructor withPropertiesConstructor = NarThreadContextClassLoader.createInstance(extensionManager, WithPropertiesConstructor.class.getName(),
|
||||
WithPropertiesConstructor.class, properties);
|
||||
assertTrue(obj instanceof WithPropertiesConstructor);
|
||||
WithPropertiesConstructor withPropertiesConstructor = (WithPropertiesConstructor) obj;
|
||||
assertNotNull(withPropertiesConstructor.properties);
|
||||
}
|
||||
|
||||
@Test(expected = IllegalStateException.class)
|
||||
public void validateWithPropertiesConstructorInstantiationFailure() throws Exception {
|
||||
@Test
|
||||
public void validateWithPropertiesConstructorInstantiationFailure() {
|
||||
Map<String, String> additionalProperties = new HashMap<>();
|
||||
additionalProperties.put("fail", "true");
|
||||
NiFiProperties properties = NiFiProperties.createBasicNiFiProperties("src/test/resources/nifi.properties", additionalProperties);
|
||||
Bundle systemBundle = SystemBundle.create(properties);
|
||||
ExtensionDiscoveringManager extensionManager = new StandardExtensionDiscoveringManager();
|
||||
extensionManager.discoverExtensions(systemBundle, Collections.emptySet());
|
||||
NarThreadContextClassLoader.createInstance(extensionManager, WithPropertiesConstructor.class.getName(), WithPropertiesConstructor.class, properties);
|
||||
assertThrows(IllegalStateException.class,
|
||||
() -> NarThreadContextClassLoader.createInstance(extensionManager, WithPropertiesConstructor.class.getName(), WithPropertiesConstructor.class, properties));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -63,16 +64,11 @@ public class NarThreadContextClassLoaderTest {
|
|||
Bundle systemBundle = SystemBundle.create(properties);
|
||||
ExtensionDiscoveringManager extensionManager = new StandardExtensionDiscoveringManager();
|
||||
extensionManager.discoverExtensions(systemBundle, Collections.emptySet());
|
||||
assertTrue(NarThreadContextClassLoader.createInstance(extensionManager, WithDefaultConstructor.class.getName(),
|
||||
WithDefaultConstructor.class, properties) instanceof WithDefaultConstructor);
|
||||
assertInstanceOf(WithDefaultConstructor.class, NarThreadContextClassLoader.createInstance(extensionManager, WithDefaultConstructor.class.getName(), WithDefaultConstructor.class, properties));
|
||||
}
|
||||
|
||||
public static class WithPropertiesConstructor extends AbstractProcessor {
|
||||
private NiFiProperties properties;
|
||||
|
||||
public WithPropertiesConstructor() {
|
||||
|
||||
}
|
||||
private final NiFiProperties properties;
|
||||
|
||||
public WithPropertiesConstructor(NiFiProperties properties) {
|
||||
if (properties.getProperty("fail") != null) {
|
||||
|
|
|
@ -17,9 +17,8 @@
|
|||
package org.apache.nifi.nar;
|
||||
|
||||
import org.apache.nifi.util.NiFiProperties;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
@ -39,13 +38,13 @@ import java.util.Set;
|
|||
import java.util.stream.Stream;
|
||||
|
||||
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class NarUnpackerTest {
|
||||
|
||||
@BeforeClass
|
||||
@BeforeAll
|
||||
public static void copyResources() throws IOException {
|
||||
|
||||
final Path sourcePath = Paths.get("./src/test/resources");
|
||||
|
@ -106,7 +105,7 @@ public class NarUnpackerTest {
|
|||
assertEquals(expectedNars.size(), extensionFiles.length);
|
||||
|
||||
for (File extensionFile : extensionFiles) {
|
||||
Assert.assertTrue(expectedNars.contains(extensionFile.getName()));
|
||||
assertTrue(expectedNars.contains(extensionFile.getName()));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -16,17 +16,17 @@
|
|||
*/
|
||||
package org.apache.nifi.nar;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URL;
|
||||
import java.util.Collections;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
public class TestInstanceClassLoader {
|
||||
|
||||
|
|
|
@ -16,15 +16,15 @@
|
|||
*/
|
||||
package org.apache.nifi.controller.repository;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class TestStandardRepositoryRecord {
|
||||
|
||||
|
|
|
@ -1,387 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.authorization;
|
||||
|
||||
import org.apache.nifi.authorization.exception.AuthorizerCreationException;
|
||||
import org.apache.nifi.authorization.util.ShellRunner;
|
||||
import org.apache.nifi.util.MockPropertyValue;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Assume;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
import org.mockito.Mockito;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.testcontainers.containers.GenericContainer;
|
||||
import org.testcontainers.utility.MountableFile;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
|
||||
public class ShellUserGroupProviderIT {
|
||||
private static final Logger logger = LoggerFactory.getLogger(ShellUserGroupProviderIT.class);
|
||||
|
||||
// These images are publicly available on the hub.docker.com, and the source to each
|
||||
// is available on github. In lieu of using named images, the Dockerfiles could be
|
||||
// migrated into module and referenced in the testcontainer setup.
|
||||
private final static String ALPINE_IMAGE = "natural/alpine-sshd:latest";
|
||||
private final static String CENTOS_IMAGE = "natural/centos-sshd:latest";
|
||||
private final static String DEBIAN_IMAGE = "natural/debian-sshd:latest";
|
||||
private final static String UBUNTU_IMAGE = "natural/ubuntu-sshd:latest";
|
||||
private final static List<String> TEST_CONTAINER_IMAGES =
|
||||
Arrays.asList(
|
||||
ALPINE_IMAGE,
|
||||
CENTOS_IMAGE,
|
||||
DEBIAN_IMAGE,
|
||||
UBUNTU_IMAGE
|
||||
);
|
||||
|
||||
private final static String CONTAINER_SSH_AUTH_KEYS = "/root/.ssh/authorized_keys";
|
||||
private final static Integer CONTAINER_SSH_PORT = 22;
|
||||
|
||||
private final String KNOWN_USER = "root";
|
||||
private final String KNOWN_UID = "0";
|
||||
|
||||
@SuppressWarnings("FieldCanBeLocal")
|
||||
private final String KNOWN_GROUP = "root";
|
||||
|
||||
@SuppressWarnings("FieldCanBeLocal")
|
||||
private final String OTHER_GROUP = "wheel"; // e.g., macos
|
||||
private final String KNOWN_GID = "0";
|
||||
|
||||
// We're using this knob to control the test runs on Travis. The issue there is that tests
|
||||
// running on Travis do not have `getent`, thus not behaving like a typical Linux installation.
|
||||
protected static boolean systemCheckFailed = false;
|
||||
|
||||
private static String sshPrivKeyFile;
|
||||
private static String sshPubKeyFile;
|
||||
|
||||
private AuthorizerConfigurationContext authContext = Mockito.mock(AuthorizerConfigurationContext.class);
|
||||
private ShellUserGroupProvider localProvider;
|
||||
private UserGroupProviderInitializationContext initContext;
|
||||
|
||||
private static ShellRunner shellRunner;
|
||||
|
||||
@ClassRule
|
||||
static public TemporaryFolder tempFolder = new TemporaryFolder();
|
||||
|
||||
@Rule
|
||||
public ExpectedException expectedException = ExpectedException.none();
|
||||
|
||||
@BeforeClass
|
||||
public static void setupOnce() throws IOException {
|
||||
sshPrivKeyFile = tempFolder.getRoot().getAbsolutePath() + "/id_rsa";
|
||||
sshPubKeyFile = sshPrivKeyFile + ".pub";
|
||||
|
||||
shellRunner = new ShellRunner(60);
|
||||
try {
|
||||
// NB: this command is a bit perplexing: it works without prompt from the shell, but hangs
|
||||
// here without the pipe from `yes`:
|
||||
shellRunner.runShell("yes | ssh-keygen -C '' -N '' -t rsa -f " + sshPrivKeyFile, "Setup");
|
||||
} catch (final IOException ioexc) {
|
||||
systemCheckFailed = true;
|
||||
logger.error("setupOnce() exception: " + ioexc + "; tests cannot run on this system.");
|
||||
return;
|
||||
}
|
||||
|
||||
// Fix the file permissions to abide by the ssh client
|
||||
// requirements:
|
||||
Arrays.asList(sshPrivKeyFile, sshPubKeyFile).forEach(name -> {
|
||||
final File f = new File(name);
|
||||
Assert.assertTrue(f.setReadable(false, false));
|
||||
Assert.assertTrue(f.setReadable(true));
|
||||
});
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setup() throws IOException {
|
||||
authContext = Mockito.mock(AuthorizerConfigurationContext.class);
|
||||
initContext = Mockito.mock(UserGroupProviderInitializationContext.class);
|
||||
|
||||
Mockito.when(authContext.getProperty(Mockito.eq(ShellUserGroupProvider.REFRESH_DELAY_PROPERTY))).thenReturn(new MockPropertyValue("10 sec"));
|
||||
Mockito.when(authContext.getProperty(Mockito.eq(ShellUserGroupProvider.EXCLUDE_GROUP_PROPERTY))).thenReturn(new MockPropertyValue(".*d$"));
|
||||
Mockito.when(authContext.getProperty(Mockito.eq(ShellUserGroupProvider.EXCLUDE_USER_PROPERTY))).thenReturn(new MockPropertyValue("^s.*"));
|
||||
|
||||
localProvider = new ShellUserGroupProvider();
|
||||
try {
|
||||
localProvider.initialize(initContext);
|
||||
localProvider.onConfigured(authContext);
|
||||
} catch (final Exception exc) {
|
||||
systemCheckFailed = true;
|
||||
logger.error("setup() exception: " + exc + "; tests cannot run on this system.");
|
||||
return;
|
||||
}
|
||||
Assert.assertEquals(10000, localProvider.getRefreshDelay());
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
localProvider.preDestruction();
|
||||
}
|
||||
|
||||
private GenericContainer createContainer(String image) throws IOException, InterruptedException {
|
||||
GenericContainer container = new GenericContainer(image)
|
||||
.withEnv("SSH_ENABLE_ROOT", "true").withExposedPorts(CONTAINER_SSH_PORT);
|
||||
container.start();
|
||||
|
||||
// This can go into the docker images:
|
||||
container.execInContainer("mkdir", "-p", "/root/.ssh");
|
||||
container.copyFileToContainer(MountableFile.forHostPath(sshPubKeyFile), CONTAINER_SSH_AUTH_KEYS);
|
||||
return container;
|
||||
}
|
||||
|
||||
private UserGroupProvider createRemoteProvider(GenericContainer container) {
|
||||
final ShellCommandsProvider remoteCommands =
|
||||
RemoteShellCommands.wrapOtherProvider(new NssShellCommands(),
|
||||
sshPrivKeyFile,
|
||||
container.getContainerIpAddress(),
|
||||
container.getMappedPort(CONTAINER_SSH_PORT));
|
||||
|
||||
ShellUserGroupProvider remoteProvider = new ShellUserGroupProvider();
|
||||
remoteProvider.setCommandsProvider(remoteCommands);
|
||||
remoteProvider.initialize(initContext);
|
||||
remoteProvider.onConfigured(authContext);
|
||||
return remoteProvider;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTooShortDelayIntervalThrowsException() throws AuthorizerCreationException {
|
||||
final AuthorizerConfigurationContext authContext = Mockito.mock(AuthorizerConfigurationContext.class);
|
||||
final ShellUserGroupProvider localProvider = new ShellUserGroupProvider();
|
||||
Mockito.when(authContext.getProperty(Mockito.eq(ShellUserGroupProvider.REFRESH_DELAY_PROPERTY))).thenReturn(new MockPropertyValue("1 milliseconds"));
|
||||
|
||||
expectedException.expect(AuthorizerCreationException.class);
|
||||
expectedException.expectMessage("The Refresh Delay '1 milliseconds' is below the minimum value of '10000 ms'");
|
||||
|
||||
localProvider.onConfigured(authContext);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testInvalidGroupExcludeExpressionsThrowsException() throws AuthorizerCreationException {
|
||||
AuthorizerConfigurationContext authContext = Mockito.mock(AuthorizerConfigurationContext.class);
|
||||
ShellUserGroupProvider localProvider = new ShellUserGroupProvider();
|
||||
Mockito.when(authContext.getProperty(Mockito.eq(ShellUserGroupProvider.REFRESH_DELAY_PROPERTY))).thenReturn(new MockPropertyValue("3 minutes"));
|
||||
Mockito.when(authContext.getProperty(Mockito.eq(ShellUserGroupProvider.EXCLUDE_GROUP_PROPERTY))).thenReturn(new MockPropertyValue("(3"));
|
||||
|
||||
expectedException.expect(AuthorizerCreationException.class);
|
||||
expectedException.expectMessage("Unclosed group near index");
|
||||
localProvider.onConfigured(authContext);
|
||||
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidUserExcludeExpressionsThrowsException() throws AuthorizerCreationException {
|
||||
AuthorizerConfigurationContext authContext = Mockito.mock(AuthorizerConfigurationContext.class);
|
||||
ShellUserGroupProvider localProvider = new ShellUserGroupProvider();
|
||||
Mockito.when(authContext.getProperty(Mockito.eq(ShellUserGroupProvider.REFRESH_DELAY_PROPERTY))).thenReturn(new MockPropertyValue("3 minutes"));
|
||||
Mockito.when(authContext.getProperty(Mockito.eq(ShellUserGroupProvider.EXCLUDE_USER_PROPERTY))).thenReturn(new MockPropertyValue("*"));
|
||||
|
||||
expectedException.expect(AuthorizerCreationException.class);
|
||||
expectedException.expectMessage("Dangling meta character");
|
||||
localProvider.onConfigured(authContext);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMissingExcludeExpressionsAllowed() throws AuthorizerCreationException {
|
||||
AuthorizerConfigurationContext authContext = Mockito.mock(AuthorizerConfigurationContext.class);
|
||||
ShellUserGroupProvider localProvider = new ShellUserGroupProvider();
|
||||
Mockito.when(authContext.getProperty(Mockito.eq(ShellUserGroupProvider.REFRESH_DELAY_PROPERTY))).thenReturn(new MockPropertyValue("3 minutes"));
|
||||
|
||||
localProvider.onConfigured(authContext);
|
||||
verifyUsersAndUsersMinimumCount(localProvider);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidDelayIntervalThrowsException() throws AuthorizerCreationException {
|
||||
final AuthorizerConfigurationContext authContext = Mockito.mock(AuthorizerConfigurationContext.class);
|
||||
final ShellUserGroupProvider localProvider = new ShellUserGroupProvider();
|
||||
Mockito.when(authContext.getProperty(Mockito.eq(ShellUserGroupProvider.REFRESH_DELAY_PROPERTY))).thenReturn(new MockPropertyValue("Not an interval"));
|
||||
|
||||
expectedException.expect(AuthorizerCreationException.class);
|
||||
expectedException.expectMessage("The Refresh Delay 'Not an interval' is not a valid time interval.");
|
||||
|
||||
localProvider.onConfigured(authContext);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCacheSizesAfterClearingCaches() {
|
||||
localProvider.clearCaches();
|
||||
assert localProvider.userCacheSize() == 0;
|
||||
assert localProvider.groupCacheSize() == 0;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLocalGetUsersAndUsersMinimumCount() {
|
||||
verifyUsersAndUsersMinimumCount(localProvider);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLocalGetKnownUserByUsername() {
|
||||
verifyKnownUserByUsername(localProvider);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLocalGetGroupsAndMinimumGroupCount() {
|
||||
verifyGroupsAndMinimumGroupCount(localProvider);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLocalGetUserByIdentityAndGetGroupMembership() {
|
||||
verifyGetUserByIdentityAndGetGroupMembership(localProvider);
|
||||
}
|
||||
|
||||
// @Ignore // for now
|
||||
@Test
|
||||
public void testVariousSystemImages() {
|
||||
// Here we explicitly clear the system check flag to allow the remote checks that follow:
|
||||
systemCheckFailed = false;
|
||||
Assume.assumeTrue(isSSHAvailable());
|
||||
|
||||
TEST_CONTAINER_IMAGES.forEach(image -> {
|
||||
GenericContainer container;
|
||||
UserGroupProvider remoteProvider;
|
||||
logger.debug("creating container from image: " + image);
|
||||
|
||||
try {
|
||||
container = createContainer(image);
|
||||
} catch (final Exception exc) {
|
||||
logger.error("create container exception: " + exc);
|
||||
return;
|
||||
}
|
||||
try {
|
||||
remoteProvider = createRemoteProvider(container);
|
||||
} catch (final Exception exc) {
|
||||
logger.error("create user provider exception: " + exc);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
verifyUsersAndUsersMinimumCount(remoteProvider);
|
||||
verifyKnownUserByUsername(remoteProvider);
|
||||
verifyGroupsAndMinimumGroupCount(remoteProvider);
|
||||
verifyGetUserByIdentityAndGetGroupMembership(remoteProvider);
|
||||
} catch (final Exception e) {
|
||||
// Some environments don't allow our tests to work.
|
||||
logger.error("Exception running remote provider on image: " + image + ", exception: " + e);
|
||||
}
|
||||
|
||||
container.stop();
|
||||
remoteProvider.preDestruction();
|
||||
logger.debug("finished with container image: " + image);
|
||||
});
|
||||
}
|
||||
|
||||
// TODO: Make test which retrieves list of users and then getUserByIdentity to ensure the user is populated in the response
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Ensures that the test can run because Docker is available and the remote instance can be reached via ssh.
|
||||
*
|
||||
* @return true if Docker is available on this OS
|
||||
*/
|
||||
private boolean isSSHAvailable() {
|
||||
return !systemCheckFailed;
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests the provider behavior by getting its users and checking minimum size.
|
||||
*
|
||||
* @param provider {@link UserGroupProvider}
|
||||
*/
|
||||
private void verifyUsersAndUsersMinimumCount(UserGroupProvider provider) {
|
||||
Assume.assumeTrue(isSSHAvailable());
|
||||
|
||||
Set<User> users = provider.getUsers();
|
||||
|
||||
// This shows that we don't have any users matching the exclude regex, which is likely because those users
|
||||
// exist but were excluded:
|
||||
for (User user : users) {
|
||||
Assert.assertFalse(user.getIdentifier().startsWith("s"));
|
||||
}
|
||||
|
||||
Assert.assertNotNull(users);
|
||||
Assert.assertTrue(users.size() > 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests the provider behavior by getting a known user by id.
|
||||
*
|
||||
* @param provider {@link UserGroupProvider}
|
||||
*/
|
||||
private void verifyKnownUserByUsername(UserGroupProvider provider) {
|
||||
Assume.assumeTrue(isSSHAvailable());
|
||||
|
||||
User root = provider.getUserByIdentity(KNOWN_USER);
|
||||
Assert.assertNotNull(root);
|
||||
Assert.assertEquals(KNOWN_USER, root.getIdentity());
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests the provider behavior by getting its groups and checking minimum size.
|
||||
*
|
||||
* @param provider {@link UserGroupProvider}
|
||||
*/
|
||||
private void verifyGroupsAndMinimumGroupCount(UserGroupProvider provider) {
|
||||
Assume.assumeTrue(isSSHAvailable());
|
||||
|
||||
Set<Group> groups = provider.getGroups();
|
||||
|
||||
// This shows that we don't have any groups matching the exclude regex, which is likely because those groups
|
||||
// exist but were excluded:
|
||||
for (Group group : groups) {
|
||||
Assert.assertFalse(group.getName().endsWith("d"));
|
||||
}
|
||||
|
||||
Assert.assertNotNull(groups);
|
||||
Assert.assertTrue(groups.size() > 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests the provider behavior by getting a known user and checking its group membership.
|
||||
*
|
||||
* @param provider {@link UserGroupProvider}
|
||||
*/
|
||||
private void verifyGetUserByIdentityAndGetGroupMembership(UserGroupProvider provider) {
|
||||
Assume.assumeTrue(isSSHAvailable());
|
||||
|
||||
UserAndGroups user = provider.getUserAndGroups(KNOWN_USER);
|
||||
Assert.assertNotNull(user);
|
||||
|
||||
try {
|
||||
Assert.assertTrue(user.getGroups().size() > 0);
|
||||
logger.info("root user group count: " + user.getGroups().size());
|
||||
} catch (final AssertionError ignored) {
|
||||
logger.info("root user and groups group count zero on this system");
|
||||
}
|
||||
|
||||
Set<Group> groups = provider.getGroups();
|
||||
Assert.assertTrue(groups.size() > user.getGroups().size());
|
||||
}
|
||||
}
|
|
@ -23,12 +23,9 @@ import org.apache.nifi.security.util.StandardTlsConfiguration
|
|||
import org.apache.nifi.security.util.TlsConfiguration
|
||||
import org.apache.nifi.util.NiFiProperties
|
||||
import org.bouncycastle.jce.provider.BouncyCastleProvider
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.BeforeClass
|
||||
import org.junit.Test
|
||||
import org.junit.runner.RunWith
|
||||
import org.junit.runners.JUnit4
|
||||
import org.junit.jupiter.api.AfterEach
|
||||
import org.junit.jupiter.api.BeforeAll
|
||||
import org.junit.jupiter.api.Test
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
|
||||
|
@ -36,8 +33,9 @@ import javax.net.ssl.SSLContext
|
|||
import javax.net.ssl.SSLServerSocket
|
||||
import java.security.Security
|
||||
|
||||
@RunWith(JUnit4.class)
|
||||
class SocketRemoteSiteListenerTest extends GroovyTestCase {
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue
|
||||
|
||||
class SocketRemoteSiteListenerTest {
|
||||
private static final Logger logger = LoggerFactory.getLogger(SocketRemoteSiteListenerTest.class)
|
||||
|
||||
private static final String KEYSTORE_PATH = "src/test/resources/localhost-ks.jks"
|
||||
|
@ -71,7 +69,7 @@ class SocketRemoteSiteListenerTest extends GroovyTestCase {
|
|||
|
||||
private SocketRemoteSiteListener srsListener
|
||||
|
||||
@BeforeClass
|
||||
@BeforeAll
|
||||
static void setUpOnce() throws Exception {
|
||||
Security.addProvider(new BouncyCastleProvider())
|
||||
|
||||
|
@ -83,11 +81,7 @@ class SocketRemoteSiteListenerTest extends GroovyTestCase {
|
|||
sslContext = SslContextFactory.createSslContext(tlsConfiguration)
|
||||
}
|
||||
|
||||
@Before
|
||||
void setUp() {
|
||||
}
|
||||
|
||||
@After
|
||||
@AfterEach
|
||||
void tearDown() {
|
||||
if (srsListener) {
|
||||
srsListener.stop()
|
||||
|
@ -126,6 +120,6 @@ class SocketRemoteSiteListenerTest extends GroovyTestCase {
|
|||
SSLServerSocket sslServerSocket = srsListener.createServerSocket() as SSLServerSocket
|
||||
logger.info("Created SSL server socket: ${KeyStoreUtils.sslServerSocketToString(sslServerSocket)}" as String)
|
||||
assertProtocolVersions(sslServerSocket.enabledProtocols, TlsConfiguration.getCurrentSupportedTlsProtocolVersions())
|
||||
assert sslServerSocket.needClientAuth
|
||||
assertTrue(sslServerSocket.needClientAuth)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,114 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.remote
|
||||
|
||||
import org.apache.nifi.authorization.Authorizer
|
||||
import org.apache.nifi.connectable.Connectable
|
||||
import org.apache.nifi.connectable.ConnectableType
|
||||
import org.apache.nifi.controller.ProcessScheduler
|
||||
import org.apache.nifi.remote.protocol.CommunicationsSession
|
||||
import org.apache.nifi.remote.protocol.ServerProtocol
|
||||
import org.apache.nifi.reporting.BulletinRepository
|
||||
import org.apache.nifi.util.NiFiProperties
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.BeforeClass
|
||||
import org.junit.Ignore
|
||||
import org.junit.Test
|
||||
import org.junit.runner.RunWith
|
||||
import org.junit.runners.JUnit4
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
|
||||
@RunWith(JUnit4.class)
|
||||
class StandardPublicPortGroovyTest extends GroovyTestCase {
|
||||
private static final Logger logger = LoggerFactory.getLogger(StandardPublicPortGroovyTest.class)
|
||||
|
||||
@BeforeClass
|
||||
static void setUpOnce() throws Exception {
|
||||
logger.metaClass.methodMissing = { String name, args ->
|
||||
logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}")
|
||||
}
|
||||
}
|
||||
|
||||
@Before
|
||||
void setUp() {
|
||||
|
||||
}
|
||||
|
||||
@After
|
||||
void tearDown() {
|
||||
|
||||
}
|
||||
|
||||
private static PublicPort createPublicPort(NiFiProperties niFiProperties) {
|
||||
Authorizer mockAuthorizer = [:] as Authorizer
|
||||
BulletinRepository mockBulletinRepository = [:] as BulletinRepository
|
||||
ProcessScheduler mockProcessScheduler = [registerEvent: { Connectable worker ->
|
||||
logger.mock("Registered event for worker: ${worker}")
|
||||
}] as ProcessScheduler
|
||||
|
||||
StandardPublicPort spp = new StandardPublicPort("id", "name", TransferDirection.RECEIVE, ConnectableType.INPUT_PORT, mockAuthorizer, mockBulletinRepository, mockProcessScheduler, false, niFiProperties.getBoredYieldDuration(), [])
|
||||
logger.info("Created SPP with mocked collaborators: ${spp}")
|
||||
spp
|
||||
}
|
||||
|
||||
// TODO: Implement test
|
||||
@Ignore("Not yet implemented")
|
||||
@Test
|
||||
void testReceiveFlowFilesShouldHandleBlockedRequestDueToContentLength() {
|
||||
// Arrange
|
||||
Map badProps = [
|
||||
(NiFiProperties.WEB_HTTP_HOST) : "localhost",
|
||||
(NiFiProperties.WEB_HTTPS_HOST): "secure.host.com",
|
||||
(NiFiProperties.WEB_THREADS) : NiFiProperties.DEFAULT_WEB_THREADS
|
||||
]
|
||||
NiFiProperties mockProps = [
|
||||
getPort : { -> 8080 },
|
||||
getSslPort : { -> 8443 },
|
||||
getProperty: { String prop ->
|
||||
String value = badProps[prop] ?: "no_value"
|
||||
logger.mock("getProperty(${prop}) -> ${value}")
|
||||
value
|
||||
},
|
||||
] as NiFiProperties
|
||||
|
||||
StandardPublicPort port = createPublicPort(mockProps)
|
||||
|
||||
final int LISTEN_SECS = 5
|
||||
|
||||
PeerDescription peerDescription = new PeerDescription("localhost", 8080, false)
|
||||
CommunicationsSession mockCommunicationsSession = [:] as CommunicationsSession
|
||||
Peer peer = new Peer(peerDescription, mockCommunicationsSession, "http://localhost", "")
|
||||
ServerProtocol mockServerProtocol = [getRequestExpiration: { -> 500L }] as ServerProtocol
|
||||
|
||||
// Act
|
||||
port.onSchedulingStart()
|
||||
logger.info("Listening on port for ${LISTEN_SECS} seconds")
|
||||
long end = System.nanoTime() + LISTEN_SECS * 1_000_000_000
|
||||
def responses = []
|
||||
while (System.nanoTime() < end) {
|
||||
responses << port.receiveFlowFiles(peer, mockServerProtocol)
|
||||
logger.info("Received ${responses[-1]} flowfiles")
|
||||
}
|
||||
logger.info("Stopped listening on port")
|
||||
logger.info("Received ${responses.sum()} total flowfiles")
|
||||
|
||||
// Assert
|
||||
assert !responses.isEmpty()
|
||||
}
|
||||
}
|
|
@ -16,22 +16,23 @@
|
|||
*/
|
||||
package org.apache.nifi.remote;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.remote.protocol.FlowFileTransaction;
|
||||
import org.apache.nifi.remote.protocol.HandshakeProperties;
|
||||
import org.apache.nifi.util.NiFiProperties;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class TestHttpRemoteSiteListener {
|
||||
|
||||
@BeforeClass
|
||||
@BeforeAll
|
||||
public static void setup() {
|
||||
System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/nifi.properties");
|
||||
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG");
|
||||
|
@ -42,7 +43,7 @@ public class TestHttpRemoteSiteListener {
|
|||
HttpRemoteSiteListener transactionManager = HttpRemoteSiteListener.getInstance(new NiFiProperties());
|
||||
String transactionId = transactionManager.createTransaction();
|
||||
|
||||
assertTrue("Transaction should be active.", transactionManager.isTransactionActive(transactionId));
|
||||
assertTrue(transactionManager.isTransactionActive(transactionId),"Transaction should be active.");
|
||||
|
||||
ProcessSession processSession = Mockito.mock(ProcessSession.class);
|
||||
FlowFileTransaction transaction = new FlowFileTransaction(processSession, null, null, 0, null, null);
|
||||
|
@ -53,7 +54,7 @@ public class TestHttpRemoteSiteListener {
|
|||
transaction = transactionManager.finalizeTransaction(transactionId);
|
||||
assertNotNull(transaction);
|
||||
|
||||
assertFalse("Transaction should not be active anymore.", transactionManager.isTransactionActive(transactionId));
|
||||
assertFalse(transactionManager.isTransactionActive(transactionId),"Transaction should not be active anymore.");
|
||||
|
||||
}
|
||||
|
||||
|
@ -62,42 +63,31 @@ public class TestHttpRemoteSiteListener {
|
|||
HttpRemoteSiteListener transactionManager = HttpRemoteSiteListener.getInstance(new NiFiProperties());
|
||||
String transactionId = transactionManager.createTransaction();
|
||||
|
||||
assertTrue("Transaction should be active.", transactionManager.isTransactionActive(transactionId));
|
||||
assertTrue(transactionManager.isTransactionActive(transactionId),"Transaction should be active.");
|
||||
|
||||
ProcessSession processSession = Mockito.mock(ProcessSession.class);
|
||||
FlowFileTransaction transaction = new FlowFileTransaction(processSession, null, null, 0, null, null);
|
||||
transactionManager.holdTransaction(transactionId, transaction, null);
|
||||
|
||||
try {
|
||||
transactionManager.holdTransaction(transactionId, transaction, null);
|
||||
fail("The same transaction id can't hold another transaction");
|
||||
} catch (IllegalStateException e) {
|
||||
}
|
||||
|
||||
assertThrows(IllegalStateException.class,
|
||||
() -> transactionManager.holdTransaction(transactionId, transaction, null),
|
||||
"The same transaction id can't hold another transaction");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoneExistingTransaction() {
|
||||
HttpRemoteSiteListener transactionManager = HttpRemoteSiteListener.getInstance(new NiFiProperties());
|
||||
|
||||
String transactionId = "does-not-exist-1";
|
||||
assertFalse("Transaction should not be active.", transactionManager.isTransactionActive(transactionId));
|
||||
final String transactionId = "does-not-exist-1";
|
||||
assertFalse(transactionManager.isTransactionActive(transactionId),"Transaction should not be active.");
|
||||
|
||||
ProcessSession processSession = Mockito.mock(ProcessSession.class);
|
||||
FlowFileTransaction transaction = new FlowFileTransaction(processSession, null, null, 0, null, null);
|
||||
try {
|
||||
transactionManager.holdTransaction(transactionId, transaction, null);
|
||||
} catch (IllegalStateException e) {
|
||||
fail("Transaction can be held even if the transaction id is not valid anymore,"
|
||||
+ " in order to support large file or slow network.");
|
||||
}
|
||||
assertDoesNotThrow(() -> transactionManager.holdTransaction(transactionId, transaction, null),
|
||||
"Transaction can be held even if the transaction id is not valid anymore,"
|
||||
+ " in order to support large file or slow network.");
|
||||
|
||||
transactionId = "does-not-exist-2";
|
||||
try {
|
||||
transactionManager.finalizeTransaction(transactionId);
|
||||
fail("But transaction should not be finalized if it isn't active.");
|
||||
} catch (IllegalStateException e) {
|
||||
}
|
||||
assertThrows(IllegalStateException.class, () -> transactionManager.finalizeTransaction("does-not-exist-2"),
|
||||
"But transaction should not be finalized if it isn't active.");
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -19,17 +19,17 @@ package org.apache.nifi.remote;
|
|||
import org.apache.nifi.attribute.expression.language.exception.AttributeExpressionLanguageException;
|
||||
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
|
||||
import org.apache.nifi.util.NiFiProperties;
|
||||
import org.junit.Test;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class TestPeerDescriptionModifier {
|
||||
|
||||
|
@ -47,12 +47,10 @@ public class TestPeerDescriptionModifier {
|
|||
Properties props = new Properties();
|
||||
props.put("nifi.remote.route.raw.no-host.when", "true");
|
||||
final NiFiProperties properties = new NiFiProperties(props);
|
||||
try {
|
||||
new PeerDescriptionModifier(properties);
|
||||
fail("Should throw an Exception");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertEquals("Found an invalid Site-to-Site route definition [no-host] 'hostname' is not specified.", e.getMessage());
|
||||
}
|
||||
|
||||
IllegalArgumentException illegalArgumentException =
|
||||
assertThrows(IllegalArgumentException.class, () -> new PeerDescriptionModifier(properties));
|
||||
assertEquals("Found an invalid Site-to-Site route definition [no-host] 'hostname' is not specified.", illegalArgumentException.getMessage());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -61,12 +59,10 @@ public class TestPeerDescriptionModifier {
|
|||
props.put("nifi.remote.route.raw.no-port.when", "true");
|
||||
props.put("nifi.remote.route.raw.no-port.hostname", "proxy.example.com");
|
||||
final NiFiProperties properties = new NiFiProperties(props);
|
||||
try {
|
||||
new PeerDescriptionModifier(properties);
|
||||
fail("Should throw an Exception");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertEquals("Found an invalid Site-to-Site route definition [no-port] 'port' is not specified.", e.getMessage());
|
||||
}
|
||||
|
||||
IllegalArgumentException illegalArgumentException =
|
||||
assertThrows(IllegalArgumentException.class, () -> new PeerDescriptionModifier(properties));
|
||||
assertEquals("Found an invalid Site-to-Site route definition [no-port] 'port' is not specified.", illegalArgumentException.getMessage());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -78,14 +74,12 @@ public class TestPeerDescriptionModifier {
|
|||
props.put("nifi.remote.route.raw.invalid-name.secure", "true");
|
||||
props.put("nifi.remote.route.raw.invalid-name.unsupported", "true");
|
||||
final NiFiProperties properties = new NiFiProperties(props);
|
||||
try {
|
||||
new PeerDescriptionModifier(properties);
|
||||
fail("Should throw an Exception");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertEquals("Found an invalid Site-to-Site route definition property 'nifi.remote.route.raw.invalid-name.unsupported'." +
|
||||
" Routing property keys should be formatted as 'nifi.remote.route.{protocol}.{name}.{routingConfigName}'." +
|
||||
" Where {protocol} is 'raw' or 'http', and {routingConfigName} is 'when', 'hostname', 'port' or 'secure'.", e.getMessage());
|
||||
}
|
||||
|
||||
IllegalArgumentException illegalArgumentException =
|
||||
assertThrows(IllegalArgumentException.class, () -> new PeerDescriptionModifier(properties));
|
||||
assertEquals("Found an invalid Site-to-Site route definition property 'nifi.remote.route.raw.invalid-name.unsupported'." +
|
||||
" Routing property keys should be formatted as 'nifi.remote.route.{protocol}.{name}.{routingConfigName}'." +
|
||||
" Where {protocol} is 'raw' or 'http', and {routingConfigName} is 'when', 'hostname', 'port' or 'secure'.", illegalArgumentException.getMessage());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -93,14 +87,11 @@ public class TestPeerDescriptionModifier {
|
|||
Properties props = new Properties();
|
||||
props.put("nifi.remote.route.", "true");
|
||||
final NiFiProperties properties = new NiFiProperties(props);
|
||||
try {
|
||||
new PeerDescriptionModifier(properties);
|
||||
fail("Should throw an Exception");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertEquals("Found an invalid Site-to-Site route definition property 'nifi.remote.route.'." +
|
||||
" Routing property keys should be formatted as 'nifi.remote.route.{protocol}.{name}.{routingConfigName}'." +
|
||||
" Where {protocol} is 'raw' or 'http', and {routingConfigName} is 'when', 'hostname', 'port' or 'secure'.", e.getMessage());
|
||||
}
|
||||
IllegalArgumentException illegalArgumentException =
|
||||
assertThrows(IllegalArgumentException.class, () -> new PeerDescriptionModifier(properties));
|
||||
assertEquals("Found an invalid Site-to-Site route definition property 'nifi.remote.route.'." +
|
||||
" Routing property keys should be formatted as 'nifi.remote.route.{protocol}.{name}.{routingConfigName}'." +
|
||||
" Where {protocol} is 'raw' or 'http', and {routingConfigName} is 'when', 'hostname', 'port' or 'secure'.", illegalArgumentException.getMessage());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -108,14 +99,11 @@ public class TestPeerDescriptionModifier {
|
|||
Properties props = new Properties();
|
||||
props.put("nifi.remote.route.http.", "true");
|
||||
final NiFiProperties properties = new NiFiProperties(props);
|
||||
try {
|
||||
new PeerDescriptionModifier(properties);
|
||||
fail("Should throw an Exception");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertEquals("Found an invalid Site-to-Site route definition property 'nifi.remote.route.http.'." +
|
||||
" Routing property keys should be formatted as 'nifi.remote.route.{protocol}.{name}.{routingConfigName}'." +
|
||||
" Where {protocol} is 'raw' or 'http', and {routingConfigName} is 'when', 'hostname', 'port' or 'secure'.", e.getMessage());
|
||||
}
|
||||
IllegalArgumentException illegalArgumentException =
|
||||
assertThrows(IllegalArgumentException.class, () -> new PeerDescriptionModifier(properties));
|
||||
assertEquals("Found an invalid Site-to-Site route definition property 'nifi.remote.route.http.'." +
|
||||
" Routing property keys should be formatted as 'nifi.remote.route.{protocol}.{name}.{routingConfigName}'." +
|
||||
" Where {protocol} is 'raw' or 'http', and {routingConfigName} is 'when', 'hostname', 'port' or 'secure'.", illegalArgumentException.getMessage());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -130,13 +118,11 @@ public class TestPeerDescriptionModifier {
|
|||
final PeerDescription source = new PeerDescription("client", 12345, true);
|
||||
final PeerDescription target = new PeerDescription("nifi0", 8081, true);
|
||||
|
||||
try {
|
||||
modifier.modify(source, target,
|
||||
SiteToSiteTransportProtocol.RAW, PeerDescriptionModifier.RequestType.Peers, new HashMap<>());
|
||||
fail("Should throw an Exception");
|
||||
} catch (AttributeExpressionLanguageException e) {
|
||||
assertTrue(e.getMessage().startsWith("Invalid Expression"));
|
||||
}
|
||||
AttributeExpressionLanguageException attributeExpressionLanguageException =
|
||||
assertThrows(AttributeExpressionLanguageException.class,
|
||||
() -> modifier.modify(source, target,
|
||||
SiteToSiteTransportProtocol.RAW, PeerDescriptionModifier.RequestType.Peers, new HashMap<>()));
|
||||
assertTrue(attributeExpressionLanguageException.getMessage().startsWith("Invalid Expression"));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -190,7 +176,7 @@ public class TestPeerDescriptionModifier {
|
|||
assertNotNull(modifiedTarget);
|
||||
assertEquals("proxy1.example.com", modifiedTarget.getHostname());
|
||||
assertEquals(17491, modifiedTarget.getPort());
|
||||
assertEquals(true, modifiedTarget.isSecure());
|
||||
assertTrue(modifiedTarget.isSecure());
|
||||
|
||||
// To nifi1.
|
||||
target = new PeerDescription("nifi1", 8081, true);
|
||||
|
@ -199,7 +185,7 @@ public class TestPeerDescriptionModifier {
|
|||
assertNotNull(modifiedTarget);
|
||||
assertEquals("proxy1.example.com", modifiedTarget.getHostname());
|
||||
assertEquals(17492, modifiedTarget.getPort());
|
||||
assertEquals(true, modifiedTarget.isSecure());
|
||||
assertTrue(modifiedTarget.isSecure());
|
||||
|
||||
// For requests coming directly, use the original target description.
|
||||
source = new PeerDescription("192.168.1.101", 23456, true);
|
||||
|
@ -242,7 +228,7 @@ public class TestPeerDescriptionModifier {
|
|||
assertNotNull(modifiedTarget);
|
||||
assertEquals("nifi0.example.com", modifiedTarget.getHostname());
|
||||
assertEquals(17491, modifiedTarget.getPort());
|
||||
assertEquals(true, modifiedTarget.isSecure());
|
||||
assertTrue(modifiedTarget.isSecure());
|
||||
|
||||
// To nifi1.
|
||||
target = new PeerDescription("nifi1", 8081, true);
|
||||
|
@ -251,7 +237,7 @@ public class TestPeerDescriptionModifier {
|
|||
assertNotNull(modifiedTarget);
|
||||
assertEquals("nifi1.example.com", modifiedTarget.getHostname());
|
||||
assertEquals(17491, modifiedTarget.getPort());
|
||||
assertEquals(true, modifiedTarget.isSecure());
|
||||
assertTrue(modifiedTarget.isSecure());
|
||||
|
||||
// For requests coming directly, use the original target description.
|
||||
source = new PeerDescription("192.168.1.101", 23456, true);
|
||||
|
@ -299,7 +285,7 @@ public class TestPeerDescriptionModifier {
|
|||
assertNotNull(modifiedTarget);
|
||||
assertEquals("nifi0.example.com", modifiedTarget.getHostname());
|
||||
assertEquals(443, modifiedTarget.getPort());
|
||||
assertEquals(true, modifiedTarget.isSecure());
|
||||
assertTrue(modifiedTarget.isSecure());
|
||||
|
||||
// To nifi1.
|
||||
proxyHeders.put("X-ProxyHost", "nifi1.example.com:443");
|
||||
|
@ -309,7 +295,7 @@ public class TestPeerDescriptionModifier {
|
|||
assertNotNull(modifiedTarget);
|
||||
assertEquals("nifi1.example.com", modifiedTarget.getHostname());
|
||||
assertEquals(443, modifiedTarget.getPort());
|
||||
assertEquals(true, modifiedTarget.isSecure());
|
||||
assertTrue(modifiedTarget.isSecure());
|
||||
|
||||
// For requests coming directly, use the original target description.
|
||||
source = new PeerDescription("192.168.1.101", 23456, true);
|
||||
|
|
|
@ -20,23 +20,22 @@ import org.apache.nifi.remote.cluster.NodeInformation;
|
|||
import org.apache.nifi.remote.protocol.RequestType;
|
||||
import org.apache.nifi.remote.protocol.ServerProtocol;
|
||||
import org.apache.nifi.util.NiFiProperties;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class TestSocketRemoteSiteListener {
|
||||
|
||||
@BeforeClass
|
||||
@BeforeAll
|
||||
public static void setup() {
|
||||
System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/nifi.properties");
|
||||
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG");
|
||||
|
|
|
@ -25,8 +25,7 @@ import org.apache.nifi.controller.ProcessScheduler;
|
|||
import org.apache.nifi.groups.ProcessGroup;
|
||||
import org.apache.nifi.reporting.BulletinRepository;
|
||||
import org.apache.nifi.util.NiFiProperties;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.Set;
|
||||
|
@ -37,6 +36,9 @@ import static org.mockito.Mockito.doAnswer;
|
|||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class TestStandardPublicPort {
|
||||
|
||||
private PublicPort createPublicPort(NiFiProperties nifiProperties) {
|
||||
|
@ -72,10 +74,10 @@ public class TestStandardPublicPort {
|
|||
final PublicPort port = createPublicPort(nifiProperties);
|
||||
|
||||
PortAuthorizationResult authResult = port.checkUserAuthorization("CN=node1, OU=nifi.test");
|
||||
Assert.assertFalse(authResult.isAuthorized());
|
||||
assertFalse(authResult.isAuthorized());
|
||||
|
||||
authResult = port.checkUserAuthorization("node1@nifi.test");
|
||||
Assert.assertTrue(authResult.isAuthorized());
|
||||
assertTrue(authResult.isAuthorized());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -97,10 +99,10 @@ public class TestStandardPublicPort {
|
|||
final PublicPort port = createPublicPort(nifiProperties);
|
||||
|
||||
PortAuthorizationResult authResult = port.checkUserAuthorization("CN=node2, OU=nifi.test");
|
||||
Assert.assertFalse(authResult.isAuthorized());
|
||||
assertFalse(authResult.isAuthorized());
|
||||
|
||||
authResult = port.checkUserAuthorization("CN=node1, OU=nifi.test");
|
||||
Assert.assertTrue(authResult.isAuthorized());
|
||||
assertTrue(authResult.isAuthorized());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -125,10 +127,10 @@ public class TestStandardPublicPort {
|
|||
final PublicPort port = createPublicPort(nifiProperties);
|
||||
|
||||
PortAuthorizationResult authResult = port.checkUserAuthorization("CN=node2, OU=nifi.test");
|
||||
Assert.assertFalse(authResult.isAuthorized());
|
||||
assertFalse(authResult.isAuthorized());
|
||||
|
||||
authResult = port.checkUserAuthorization("CN=node1, OU=nifi.test");
|
||||
Assert.assertTrue(authResult.isAuthorized());
|
||||
assertTrue(authResult.isAuthorized());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -41,8 +41,8 @@ import org.apache.nifi.util.MockProcessContext;
|
|||
import org.apache.nifi.util.MockProcessSession;
|
||||
import org.apache.nifi.util.NiFiProperties;
|
||||
import org.apache.nifi.util.SharedSessionState;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.util.ArrayList;
|
||||
|
@ -56,7 +56,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
|
@ -83,7 +83,7 @@ public class TestStandardRemoteGroupPort {
|
|||
private MockProcessSession processSession;
|
||||
private MockProcessContext processContext;
|
||||
|
||||
@BeforeClass
|
||||
@BeforeAll
|
||||
public static void setup() throws Exception {
|
||||
System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/nifi.properties");
|
||||
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG");
|
||||
|
@ -367,7 +367,7 @@ public class TestStandardRemoteGroupPort {
|
|||
|
||||
// Verify transactions, sent packets, and provenance events.
|
||||
assertEquals(flowFiles.size(), totalPacketsSent.get());
|
||||
assertEquals("The number of transactions should match as expected.", expectedNumberOfPackets.length, sentPackets.size());
|
||||
assertEquals(expectedNumberOfPackets.length, sentPackets.size(),"The number of transactions should match as expected.");
|
||||
final List<ProvenanceEventRecord> provenanceEvents = sessionState.getProvenanceEvents();
|
||||
// SEND and DROP events for each flowfile
|
||||
assertEquals(flowFiles.size() * 2, provenanceEvents.size());
|
||||
|
|
|
@ -1,116 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.remote;
|
||||
|
||||
//package nifi.remote;
|
||||
//
|
||||
//import static org.junit.Assert.assertEquals;
|
||||
//
|
||||
//import java.io.IOException;
|
||||
//import java.util.LinkedHashMap;
|
||||
//import java.util.List;
|
||||
//import java.util.Map;
|
||||
//
|
||||
//import nifi.cluster.NodeInformation;
|
||||
//import nifi.remote.StandardSiteToSiteProtocol.Destination;
|
||||
//
|
||||
//import org.junit.Assert;
|
||||
//import org.junit.Test;
|
||||
//import org.mockito.Mockito;
|
||||
//
|
||||
//public class TestStandardSiteToSiteProtocol {
|
||||
//
|
||||
// @Test
|
||||
// public void testWeightedDistributionWithTwoNodes() throws IOException {
|
||||
// final Map<NodeInformation, Destination> destinationMap = new LinkedHashMap<>();
|
||||
// final NodeInformation node1 = new NodeInformation("hostA", 80, 90, true, 3);
|
||||
// final NodeInformation node2 = new NodeInformation("hostB", 80, 90, true, 500);
|
||||
//
|
||||
// final Destination node1Destination = new Destination(createRemoteGroupPort("PortA"), null, node1, TransferDirection.SEND, true, null);
|
||||
// final Destination node2Destination = new Destination(createRemoteGroupPort("PortB"), null, node2, TransferDirection.SEND, true, null);
|
||||
//
|
||||
// destinationMap.put(node1, node1Destination);
|
||||
// destinationMap.put(node2, node2Destination);
|
||||
//
|
||||
// final List<Destination> destinations = StandardSiteToSiteProtocol.formulateDestinationList(destinationMap, TransferDirection.SEND);
|
||||
// int node1Count = 0, node2Count = 0;
|
||||
// for ( final Destination destination : destinations ) {
|
||||
// if ( destination.getNodeInformation() == node1 ) {
|
||||
// node1Count++;
|
||||
// } else if ( destination.getNodeInformation() == node2 ) {
|
||||
// node2Count++;
|
||||
// } else {
|
||||
// Assert.fail("Got Destination for unknkown NodeInformation");
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// System.out.println(node1Count);
|
||||
// System.out.println(node2Count);
|
||||
//
|
||||
// final double node1Pct = (double) node1Count / (double) (node1Count + node2Count);
|
||||
// assertEquals(0.80, node1Pct, 0.01);
|
||||
// // node1 should get the most but is not allowed to have more than approximately 80% of the data.
|
||||
// }
|
||||
//
|
||||
// @Test
|
||||
// public void testWeightedDistributionWithThreeNodes() throws IOException {
|
||||
// final Map<NodeInformation, Destination> destinationMap = new LinkedHashMap<>();
|
||||
// final NodeInformation node1 = new NodeInformation("hostA", 80, 90, true, 3);
|
||||
// final NodeInformation node2 = new NodeInformation("hostB", 80, 90, true, 500);
|
||||
// final NodeInformation node3 = new NodeInformation("hostC", 80, 90, true, 500);
|
||||
//
|
||||
// final Destination node1Destination = new Destination(createRemoteGroupPort("PortA"), null, node1, TransferDirection.SEND, true, null);
|
||||
// final Destination node2Destination = new Destination(createRemoteGroupPort("PortB"), null, node2, TransferDirection.SEND, true, null);
|
||||
// final Destination node3Destination = new Destination(createRemoteGroupPort("PortC"), null, node3, TransferDirection.SEND, true, null);
|
||||
//
|
||||
// destinationMap.put(node1, node1Destination);
|
||||
// destinationMap.put(node2, node2Destination);
|
||||
// destinationMap.put(node3, node3Destination);
|
||||
//
|
||||
// final List<Destination> destinations = StandardSiteToSiteProtocol.formulateDestinationList(destinationMap, TransferDirection.SEND);
|
||||
// int node1Count = 0, node2Count = 0, node3Count = 0;
|
||||
// for ( final Destination destination : destinations ) {
|
||||
// if ( destination.getNodeInformation() == node1 ) {
|
||||
// node1Count++;
|
||||
// } else if ( destination.getNodeInformation() == node2 ) {
|
||||
// node2Count++;
|
||||
// } else if ( destination.getNodeInformation() == node3 ) {
|
||||
// node3Count++;
|
||||
// } else {
|
||||
// Assert.fail("Got Destination for unknkown NodeInformation");
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// System.out.println(node1Count);
|
||||
// System.out.println(node2Count);
|
||||
// System.out.println(node3Count);
|
||||
//
|
||||
// final double node1Pct = (double) node1Count / (double) (node1Count + node2Count + node3Count);
|
||||
// final double node2Pct = (double) node2Count / (double) (node1Count + node2Count + node3Count);
|
||||
// final double node3Pct = (double) node3Count / (double) (node1Count + node2Count + node3Count);
|
||||
//
|
||||
// assertEquals(0.5, node1Pct, 0.02);
|
||||
// assertEquals(0.25, node2Pct, 0.02);
|
||||
// assertEquals(node2Pct, node3Pct, 0.02);
|
||||
// }
|
||||
//
|
||||
// private RemoteGroupPort createRemoteGroupPort(final String portName) {
|
||||
// RemoteGroupPort port = Mockito.mock(RemoteGroupPort.class);
|
||||
// Mockito.when(port.getName()).thenReturn(portName);
|
||||
// return port;
|
||||
// }
|
||||
//}
|
|
@ -48,8 +48,8 @@ import org.apache.nifi.util.MockProcessContext;
|
|||
import org.apache.nifi.util.MockProcessSession;
|
||||
import org.apache.nifi.util.NiFiProperties;
|
||||
import org.apache.nifi.util.SharedSessionState;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
|
@ -68,10 +68,10 @@ import java.util.function.Function;
|
|||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
|
@ -84,7 +84,7 @@ public class TestHttpFlowFileServerProtocol {
|
|||
private MockProcessSession processSession;
|
||||
private MockProcessContext processContext;
|
||||
|
||||
@BeforeClass
|
||||
@BeforeAll
|
||||
public static void setup() throws Exception {
|
||||
System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/nifi.properties");
|
||||
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG");
|
||||
|
@ -112,22 +112,18 @@ public class TestHttpFlowFileServerProtocol {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testIllegalHandshakeProperty() throws Exception {
|
||||
public void testIllegalHandshakeProperty() {
|
||||
final HttpFlowFileServerProtocol serverProtocol = getDefaultHttpFlowFileServerProtocol();
|
||||
final Peer peer = getDefaultPeer();
|
||||
((HttpServerCommunicationsSession)peer.getCommunicationsSession()).getHandshakeParams().clear();
|
||||
try {
|
||||
serverProtocol.handshake(peer);
|
||||
fail();
|
||||
} catch (final HandshakeException e) {
|
||||
assertEquals(ResponseCode.MISSING_PROPERTY, e.getResponseCode());
|
||||
}
|
||||
|
||||
HandshakeException handshakeException =
|
||||
assertThrows(HandshakeException.class, () -> serverProtocol.handshake(peer));
|
||||
assertEquals(ResponseCode.MISSING_PROPERTY, handshakeException.getResponseCode());
|
||||
assertFalse(serverProtocol.isHandshakeSuccessful());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUnknownPort() throws Exception {
|
||||
public void testUnknownPort() {
|
||||
final HttpFlowFileServerProtocol serverProtocol = getDefaultHttpFlowFileServerProtocol();
|
||||
final Peer peer = getDefaultPeer();
|
||||
((HttpServerCommunicationsSession)peer.getCommunicationsSession())
|
||||
|
@ -137,18 +133,14 @@ public class TestHttpFlowFileServerProtocol {
|
|||
doReturn(true).when(processGroup).isRootGroup();
|
||||
|
||||
serverProtocol.setRootProcessGroup(processGroup);
|
||||
try {
|
||||
serverProtocol.handshake(peer);
|
||||
fail();
|
||||
} catch (final HandshakeException e) {
|
||||
assertEquals(ResponseCode.UNKNOWN_PORT, e.getResponseCode());
|
||||
}
|
||||
|
||||
HandshakeException handshakeException =
|
||||
assertThrows(HandshakeException.class, () -> serverProtocol.handshake(peer));
|
||||
assertEquals(ResponseCode.UNKNOWN_PORT, handshakeException.getResponseCode());
|
||||
assertFalse(serverProtocol.isHandshakeSuccessful());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUnauthorized() throws Exception {
|
||||
public void testUnauthorized() {
|
||||
final HttpFlowFileServerProtocol serverProtocol = getDefaultHttpFlowFileServerProtocol();
|
||||
final Peer peer = getDefaultPeer();
|
||||
((HttpServerCommunicationsSession)peer.getCommunicationsSession())
|
||||
|
@ -162,18 +154,14 @@ public class TestHttpFlowFileServerProtocol {
|
|||
doReturn(authResult).when(port).checkUserAuthorization(any(String.class));
|
||||
|
||||
serverProtocol.setRootProcessGroup(processGroup);
|
||||
try {
|
||||
serverProtocol.handshake(peer);
|
||||
fail();
|
||||
} catch (final HandshakeException e) {
|
||||
assertEquals(ResponseCode.UNAUTHORIZED, e.getResponseCode());
|
||||
}
|
||||
|
||||
HandshakeException handshakeException =
|
||||
assertThrows(HandshakeException.class, () -> serverProtocol.handshake(peer));
|
||||
assertEquals(ResponseCode.UNAUTHORIZED, handshakeException.getResponseCode());
|
||||
assertFalse(serverProtocol.isHandshakeSuccessful());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPortNotInValidState() throws Exception {
|
||||
public void testPortNotInValidState() {
|
||||
final HttpFlowFileServerProtocol serverProtocol = getDefaultHttpFlowFileServerProtocol();
|
||||
final Peer peer = getDefaultPeer();
|
||||
((HttpServerCommunicationsSession)peer.getCommunicationsSession())
|
||||
|
@ -188,18 +176,14 @@ public class TestHttpFlowFileServerProtocol {
|
|||
doReturn(true).when(authResult).isAuthorized();
|
||||
|
||||
serverProtocol.setRootProcessGroup(processGroup);
|
||||
try {
|
||||
serverProtocol.handshake(peer);
|
||||
fail();
|
||||
} catch (final HandshakeException e) {
|
||||
assertEquals(ResponseCode.PORT_NOT_IN_VALID_STATE, e.getResponseCode());
|
||||
}
|
||||
|
||||
HandshakeException handshakeException =
|
||||
assertThrows(HandshakeException.class, () -> serverProtocol.handshake(peer));
|
||||
assertEquals(ResponseCode.PORT_NOT_IN_VALID_STATE, handshakeException.getResponseCode());
|
||||
assertFalse(serverProtocol.isHandshakeSuccessful());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPortDestinationFull() throws Exception {
|
||||
public void testPortDestinationFull() {
|
||||
final HttpFlowFileServerProtocol serverProtocol = getDefaultHttpFlowFileServerProtocol();
|
||||
final Peer peer = getDefaultPeer();
|
||||
((HttpServerCommunicationsSession)peer.getCommunicationsSession())
|
||||
|
@ -223,13 +207,9 @@ public class TestHttpFlowFileServerProtocol {
|
|||
doReturn(true).when(flowFileQueue).isFull();
|
||||
|
||||
serverProtocol.setRootProcessGroup(processGroup);
|
||||
try {
|
||||
serverProtocol.handshake(peer);
|
||||
fail();
|
||||
} catch (final HandshakeException e) {
|
||||
assertEquals(ResponseCode.PORTS_DESTINATION_FULL, e.getResponseCode());
|
||||
}
|
||||
|
||||
HandshakeException handshakeException =
|
||||
assertThrows(HandshakeException.class, () -> serverProtocol.handshake(peer));
|
||||
assertEquals(ResponseCode.PORTS_DESTINATION_FULL, handshakeException.getResponseCode());
|
||||
assertFalse(serverProtocol.isHandshakeSuccessful());
|
||||
}
|
||||
|
||||
|
@ -252,17 +232,10 @@ public class TestHttpFlowFileServerProtocol {
|
|||
|
||||
final ProcessContext context = null;
|
||||
final ProcessSession processSession = null;
|
||||
try {
|
||||
serverProtocol.transferFlowFiles(peer, context, processSession, negotiatedCoded);
|
||||
fail("transferFlowFiles should fail since it's already shutdown.");
|
||||
} catch (final IllegalStateException e) {
|
||||
}
|
||||
|
||||
try {
|
||||
serverProtocol.receiveFlowFiles(peer, context, processSession, negotiatedCoded);
|
||||
fail("receiveFlowFiles should fail since it's already shutdown.");
|
||||
} catch (final IllegalStateException e) {
|
||||
}
|
||||
assertThrows(IllegalStateException.class,() -> serverProtocol.transferFlowFiles(peer, context, processSession, negotiatedCoded),
|
||||
"transferFlowFiles should fail since it's already shutdown.");
|
||||
assertThrows(IllegalStateException.class, () -> serverProtocol.receiveFlowFiles(peer, context, processSession, negotiatedCoded),
|
||||
"receiveFlowFiles should fail since it's already shutdown.");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -348,12 +321,9 @@ public class TestHttpFlowFileServerProtocol {
|
|||
});
|
||||
|
||||
// Commit transaction
|
||||
try {
|
||||
serverProtocol.commitTransferTransaction(peer, "client-sent-wrong-checksum");
|
||||
fail();
|
||||
} catch (final IOException e) {
|
||||
assertTrue(e.getMessage().contains("CRC32 Checksum"));
|
||||
}
|
||||
IOException ioException =
|
||||
assertThrows(IOException.class, () -> serverProtocol.commitTransferTransaction(peer, "client-sent-wrong-checksum"));
|
||||
assertTrue(ioException.getMessage().contains("CRC32 Checksum"));
|
||||
}
|
||||
|
||||
private Peer transferFlowFiles(final HttpFlowFileServerProtocol serverProtocol, final String transactionId,
|
||||
|
@ -516,12 +486,8 @@ public class TestHttpFlowFileServerProtocol {
|
|||
|
||||
// Commit transaction
|
||||
commsSession.setResponseCode(ResponseCode.BAD_CHECKSUM);
|
||||
try {
|
||||
serverProtocol.commitReceiveTransaction(peer);
|
||||
fail();
|
||||
} catch (final IOException e) {
|
||||
assertTrue(e.getMessage().contains("Received a BadChecksum response"));
|
||||
}
|
||||
IOException ioException = assertThrows(IOException.class, () -> serverProtocol.commitReceiveTransaction(peer));
|
||||
assertTrue(ioException.getMessage().contains("Received a BadChecksum response"));
|
||||
}
|
||||
|
||||
private void receiveFlowFiles(final HttpFlowFileServerProtocol serverProtocol, final String transactionId, final Peer peer, final DataPacket ... dataPackets) throws IOException {
|
||||
|
|
|
@ -29,8 +29,8 @@ import org.apache.nifi.remote.protocol.HandshakeProperty;
|
|||
import org.apache.nifi.remote.protocol.Response;
|
||||
import org.apache.nifi.remote.protocol.ResponseCode;
|
||||
import org.apache.nifi.util.NiFiProperties;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
|
@ -43,14 +43,14 @@ import java.util.ArrayList;
|
|||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class TestSocketFlowFileServerProtocol {
|
||||
|
||||
@BeforeClass
|
||||
@BeforeAll
|
||||
public static void setup() throws Exception {
|
||||
System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/nifi.properties");
|
||||
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG");
|
||||
|
|
|
@ -26,14 +26,14 @@ import org.apache.nifi.processor.exception.ProcessException;
|
|||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.MockProcessSession;
|
||||
import org.apache.nifi.util.SharedSessionState;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
public class NewestFirstPrioritizerTest {
|
||||
|
||||
@Test
|
||||
public void testPrioritizer() throws InstantiationException, IllegalAccessException {
|
||||
public void testPrioritizer() {
|
||||
final Processor processor = new SimpleProcessor();
|
||||
final AtomicLong idGenerator = new AtomicLong(0L);
|
||||
final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, idGenerator), Mockito.mock(Processor.class));
|
||||
|
@ -46,13 +46,13 @@ public class NewestFirstPrioritizerTest {
|
|||
final MockFlowFile flowFile2 = session.create();
|
||||
|
||||
final NewestFlowFileFirstPrioritizer prioritizer = new NewestFlowFileFirstPrioritizer();
|
||||
Assert.assertEquals(0, prioritizer.compare(null, null));
|
||||
Assert.assertEquals(-1, prioritizer.compare(flowFile1, null));
|
||||
Assert.assertEquals(1, prioritizer.compare(null, flowFile1));
|
||||
Assert.assertEquals(0, prioritizer.compare(flowFile1, flowFile1));
|
||||
Assert.assertEquals(0, prioritizer.compare(flowFile2, flowFile2));
|
||||
Assert.assertEquals(1, prioritizer.compare(flowFile1, flowFile2));
|
||||
Assert.assertEquals(-1, prioritizer.compare(flowFile2, flowFile1));
|
||||
assertEquals(0, prioritizer.compare(null, null));
|
||||
assertEquals(-1, prioritizer.compare(flowFile1, null));
|
||||
assertEquals(1, prioritizer.compare(null, flowFile1));
|
||||
assertEquals(0, prioritizer.compare(flowFile1, flowFile1));
|
||||
assertEquals(0, prioritizer.compare(flowFile2, flowFile2));
|
||||
assertEquals(1, prioritizer.compare(flowFile1, flowFile2));
|
||||
assertEquals(-1, prioritizer.compare(flowFile2, flowFile1));
|
||||
}
|
||||
|
||||
public class SimpleProcessor extends AbstractProcessor {
|
||||
|
|
|
@ -16,8 +16,6 @@
|
|||
*/
|
||||
package org.apache.nifi.prioritizer;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.nifi.processor.AbstractProcessor;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
|
@ -26,14 +24,17 @@ import org.apache.nifi.processor.exception.ProcessException;
|
|||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.MockProcessSession;
|
||||
import org.apache.nifi.util.SharedSessionState;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
public class OldestFirstPrioritizerTest {
|
||||
|
||||
@Test
|
||||
public void testPrioritizer() throws InstantiationException, IllegalAccessException {
|
||||
public void testPrioritizer() {
|
||||
final Processor processor = new SimpleProcessor();
|
||||
final AtomicLong idGenerator = new AtomicLong(0L);
|
||||
final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, idGenerator), Mockito.mock(Processor.class));
|
||||
|
@ -46,13 +47,13 @@ public class OldestFirstPrioritizerTest {
|
|||
final MockFlowFile flowFile2 = session.create();
|
||||
|
||||
final OldestFlowFileFirstPrioritizer prioritizer = new OldestFlowFileFirstPrioritizer();
|
||||
Assert.assertEquals(0, prioritizer.compare(null, null));
|
||||
Assert.assertEquals(-1, prioritizer.compare(flowFile1, null));
|
||||
Assert.assertEquals(1, prioritizer.compare(null, flowFile1));
|
||||
Assert.assertEquals(0, prioritizer.compare(flowFile1, flowFile1));
|
||||
Assert.assertEquals(0, prioritizer.compare(flowFile2, flowFile2));
|
||||
Assert.assertEquals(-1, prioritizer.compare(flowFile1, flowFile2));
|
||||
Assert.assertEquals(1, prioritizer.compare(flowFile2, flowFile1));
|
||||
assertEquals(0, prioritizer.compare(null, null));
|
||||
assertEquals(-1, prioritizer.compare(flowFile1, null));
|
||||
assertEquals(1, prioritizer.compare(null, flowFile1));
|
||||
assertEquals(0, prioritizer.compare(flowFile1, flowFile1));
|
||||
assertEquals(0, prioritizer.compare(flowFile2, flowFile2));
|
||||
assertEquals(-1, prioritizer.compare(flowFile1, flowFile2));
|
||||
assertEquals(1, prioritizer.compare(flowFile2, flowFile1));
|
||||
}
|
||||
|
||||
public class SimpleProcessor extends AbstractProcessor {
|
||||
|
|
|
@ -16,12 +16,6 @@
|
|||
*/
|
||||
package org.apache.nifi.prioritizer;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
import org.apache.nifi.processor.AbstractProcessor;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
|
@ -31,21 +25,27 @@ import org.apache.nifi.processor.exception.ProcessException;
|
|||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.MockProcessSession;
|
||||
import org.apache.nifi.util.SharedSessionState;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
public class PriorityAttributePrioritizerTest {
|
||||
|
||||
static Map<String, String> attrsPri1 = new HashMap<String, String>();
|
||||
static Map<String, String> attrsPri2 = new HashMap<String, String>();
|
||||
static Map<String, String> attrsPrin1 = new HashMap<String, String>();
|
||||
static Map<String, String> attrsPriA = new HashMap<String, String>();
|
||||
static Map<String, String> attrsPriB = new HashMap<String, String>();
|
||||
static Map<String, String> attrsPriLP = new HashMap<String, String>();
|
||||
static Map<String, String> attrsPriLN = new HashMap<String, String>();
|
||||
static Map<String, String> attrsPri1 = new HashMap<>();
|
||||
static Map<String, String> attrsPri2 = new HashMap<>();
|
||||
static Map<String, String> attrsPrin1 = new HashMap<>();
|
||||
static Map<String, String> attrsPriA = new HashMap<>();
|
||||
static Map<String, String> attrsPriB = new HashMap<>();
|
||||
static Map<String, String> attrsPriLP = new HashMap<>();
|
||||
static Map<String, String> attrsPriLN = new HashMap<>();
|
||||
|
||||
@BeforeClass
|
||||
@BeforeAll
|
||||
public static void init() {
|
||||
attrsPri1.put(CoreAttributes.PRIORITY.key(), "1");
|
||||
attrsPri2.put(CoreAttributes.PRIORITY.key(), "2");
|
||||
|
@ -57,7 +57,7 @@ public class PriorityAttributePrioritizerTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testPrioritizer() throws InstantiationException, IllegalAccessException {
|
||||
public void testPrioritizer() {
|
||||
final Processor processor = new SimpleProcessor();
|
||||
final AtomicLong idGenerator = new AtomicLong(0L);
|
||||
final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, idGenerator), Mockito.mock(Processor.class));
|
||||
|
|
Loading…
Reference in New Issue