ARTEMIS-1977 Stripping activemq-artemis as a separated proejct

This commit is contained in:
Clebert Suconic 2019-02-27 12:34:43 -05:00
parent 9e22a048b6
commit 061fb2787e
60 changed files with 125 additions and 3203 deletions

View File

@ -3,16 +3,15 @@ language: java
install: true
# clean out Artemis artifacts from the cache
# whack native libs such that LibAIO is not used till we figure out the resource constraints
before_install:
- rm -rf $HOME/.m2/repository/org/apache/activemq/artemis-*
- rm artemis-native/bin/libartemis-native-*
# use 'install' so smoke-tests will work
# use '-Pextra-tests' to ensure extra-tests compiles even though they won't actually run
# By setting anything to org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory.DISABLED we are disabling libaio loading on the testsuite
script:
- set -e
- mvn -Pfast-tests -Pextra-tests -B install -q -pl '!artemis-web'
- mvn -Dorg.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory.DISABLED=AnythingNotNull -Pfast-tests -Pextra-tests -B install -q -pl '!artemis-web'
- cd examples
- mvn install -Prelease install -B -q

View File

@ -1,18 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
CMAKE_MINIMUM_REQUIRED(VERSION 2.6)
SUBDIRS(artemis-native)

View File

@ -55,8 +55,8 @@
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-native</artifactId>
<version>${project.version}</version>
<artifactId>activemq-artemis-native</artifactId>
<version>${activemq-artemis-native-version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>

View File

@ -45,8 +45,8 @@ import org.apache.activemq.artemis.cli.commands.util.HashUtil;
import org.apache.activemq.artemis.cli.commands.util.SyncCalculation;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.jlibaio.LibaioContext;
import org.apache.activemq.artemis.jlibaio.LibaioFile;
import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext;
import org.apache.activemq.artemis.nativo.jlibaio.LibaioFile;
import org.apache.activemq.artemis.utils.FileUtil;
/**

View File

@ -32,7 +32,7 @@ import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.jlibaio.LibaioContext;
import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext;
import org.apache.activemq.artemis.utils.ReusableLatch;
/**

View File

@ -65,7 +65,7 @@ import org.apache.activemq.artemis.core.config.impl.FileConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.management.ManagementContext;
import org.apache.activemq.artemis.jlibaio.LibaioContext;
import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.junit.Wait;

View File

@ -20,7 +20,7 @@ import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.cli.Artemis;
import org.apache.activemq.artemis.cli.commands.Run;
import org.apache.activemq.artemis.cli.commands.tools.LockAbstract;
import org.apache.activemq.artemis.jlibaio.LibaioContext;
import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.spi.core.security.jaas.PropertiesLoader;

View File

@ -124,8 +124,8 @@
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-native</artifactId>
<version>${project.version}</version>
<artifactId>activemq-artemis-native</artifactId>
<version>${activemq-artemis-native-version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>

View File

@ -55,7 +55,7 @@
<include>org.apache.activemq:artemis-jms-server</include>
<include>org.apache.activemq:artemis-journal</include>
<include>org.apache.activemq:artemis-jdbc-store</include>
<include>org.apache.activemq:artemis-native</include>
<include>org.apache.activemq:activemq-artemis-native</include>
<include>org.apache.activemq:artemis-amqp-protocol</include>
<include>org.apache.activemq:artemis-openwire-protocol</include>
<include>org.apache.activemq:artemis-hornetq-protocol</include>
@ -123,7 +123,7 @@
<!-- native -->
<dependencySet>
<includes>
<include>org.apache.activemq:artemis-native</include>
<include>org.apache.activemq:activemq-artemis-native</include>
</includes>
<outputDirectory>bin</outputDirectory>
<unpack>true</unpack>

View File

@ -60,22 +60,6 @@
<exclude>mvnw.cmd</exclude>
<exclude>.mvn/</exclude>
<exclude>artemis_doap.rdf</exclude>
<exclude>artemis-native/bin/</exclude>
<!-- Files generated from automake -->
<exclude>CMakeCache.txt</exclude>
<exclude>CMakeFiles/</exclude>
<exclude>Makefile</exclude>
<exclude>artemis-native/CMakeCache.txt</exclude>
<exclude>artemis-native/CMakeFiles/</exclude>
<exclude>artemis-native/Makefile</exclude>
<exclude>artemis-native/cmake_install.cmake</exclude>
<exclude>artemis-native/src/main/c/CMakeFiles/</exclude>
<exclude>artemis-native/src/main/c/Makefile</exclude>
<exclude>artemis-native/src/main/c/cmake_install.cmake</exclude>
<exclude>artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.h</exclude>
<exclude>cmake_install.cmake</exclude>
<!-- build output -->
<exclude>

View File

@ -66,7 +66,7 @@
<bundle dependency="true">mvn:org.apache.commons/commons-text/1.6</bundle>
<bundle dependency="true">mvn:org.apache.commons/commons-lang3/${commons.lang.version}</bundle>
<bundle>mvn:org.apache.activemq/artemis-native/${pom.version}</bundle>
<bundle>mvn:org.apache.activemq/activemq-artemis-native/${activemq-artemis-native-version}</bundle>
<bundle>mvn:org.apache.activemq/artemis-server-osgi/${pom.version}</bundle>
</feature>

View File

@ -64,8 +64,8 @@
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-native</artifactId>
<version>${project.version}</version>
<artifactId>activemq-artemis-native</artifactId>
<version>${activemq-artemis-native-version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>

View File

@ -33,7 +33,7 @@ import org.apache.activemq.artemis.core.io.DummyCallback;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
import org.apache.activemq.artemis.jlibaio.LibaioFile;
import org.apache.activemq.artemis.nativo.jlibaio.LibaioFile;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.jboss.logging.Logger;

View File

@ -30,10 +30,10 @@ import org.apache.activemq.artemis.core.io.AbstractSequentialFileFactory;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.jlibaio.LibaioContext;
import org.apache.activemq.artemis.jlibaio.LibaioFile;
import org.apache.activemq.artemis.jlibaio.SubmitInfo;
import org.apache.activemq.artemis.jlibaio.util.CallbackCache;
import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext;
import org.apache.activemq.artemis.nativo.jlibaio.LibaioFile;
import org.apache.activemq.artemis.nativo.jlibaio.SubmitInfo;
import org.apache.activemq.artemis.nativo.jlibaio.util.CallbackCache;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
import org.jboss.logging.Logger;
@ -42,6 +42,19 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
private static final Logger logger = Logger.getLogger(AIOSequentialFileFactory.class);
// This is useful in cases where you want to disable loading the native library. (e.g. testsuite)
private static final boolean DISABLED = System.getProperty(AIOSequentialFileFactory.class.getName() + ".DISABLED") != null;
static {
// This is usually only used on testsuite.
// In case it's used, I would rather have it on the loggers so we know what's happening
if (DISABLED) {
// This is only used in tests, hence I'm not creating a Logger for this
logger.info(AIOSequentialFileFactory.class.getName() + ".DISABLED = true");
}
}
private final ReuseBuffersController buffersControl = new ReuseBuffersController();
private volatile boolean reuseBuffers = true;
@ -114,7 +127,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
}
public static boolean isSupported() {
return LibaioContext.isLoaded();
return !DISABLED && LibaioContext.isLoaded();
}
public static boolean isSupported(File journalPath) {

View File

@ -20,7 +20,7 @@ import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import org.apache.activemq.artemis.jlibaio.LibaioFile;
import org.apache.activemq.artemis.nativo.jlibaio.LibaioFile;
public class ActiveMQFileLock extends FileLock {

View File

@ -1,18 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
CMAKE_MINIMUM_REQUIRED(VERSION 2.6)
SUBDIRS(src/main/c)

View File

@ -1,87 +0,0 @@
# Introduction
This is a simple tutorial about building and packaging the libartemis-native library. The libartemis-native is a thin
layer library that interface with Linux' lib AIO library as part of the journaling feature of the broker when operating
with AIO journal.
The lib AIO is a Linux-specific dependency, therefore having a relatively modern Linux operating system is assumed for
the purpose of this documentation.
## Dependencies
In order to build the package, make sure you install these packages:
- The GNU compiler library container both the C and C++ compiler
- The GNU C library
- The respective libaio package for your Linux distribution
- JDK (full JDK)
For example, on Fedora Linux, compilation of the library requires the following specific packages:
- glibc-devel
- libaio-devel
- gcc
- gcc-g++
- java-1.8.0-openjdk-devel
### Cross compilation
Using a 64-bit Linux OS, it is possible to cross-compile the 32-bit version of the library. For this, the 32-bits
version of the GNU C Library and lib AIO should be installed.
Once again using Fedora Linux as an example, it would mean that the following packages need to be installed:
- glibc-devel.i686
- libaio-devel.i686
## Lib AIO Information
The Lib AIO is the Linux' Kernel Asynchronous I/O Support Library. It is part of the kernel project. The library makes
system calls on the kernel layer.
This is the project information:
Git Repository: git://git.kernel.org/pub/scm/libs/libaio/libaio.git
Mailing List: linux-aio@kvack.org
## Steps to build
1. Make sure you have JAVA_HOME defined, and pointing to the root of your JDK:
Example:
```export JAVA_HOME=/usr/share/jdk1.7```
2. Call compile-native.sh. Bootstrap will call all the initial scripts you need
$> ./compile-native.sh
if you are missing any dependencies, autoconf would tell you what you're missing.
### Compiled File
The produced file will be under the ./target/nar (example: ./target/nar/artemis-native-1.0.0-amd64-Linux-gpp-jni/lib/amd64-Linux-gpp/jni/libartemis-native-1.0.0.so)
and you will have to rename it manually under ./bin following the appropriate pattern.
### Advanced Compilation Methods and Developer-specific Documentation
Passing additional options to the compiler:
```cmake -DCMAKE_USER_C_FLAGS="-fomit-frame-pointer" -DCMAKE_VERBOSE_MAKEFILE=On .```
Compiling with debug options:
```cmake -DCMAKE_BUILD_TYPE=Debug -DCMAKE_VERBOSE_MAKEFILE=On .```
Cross-compilation:
```cmake -DCMAKE_VERBOSE_MAKEFILE=On -DCMAKE_USER_C_FLAGS="-m32" -DARTEMIS_CROSS_COMPILE=On -DARTEMIS_CROSS_COMPILE_ROOT_PATH=/usr/lib .```
Cross-compilation with debugging symbols:
```cmake -DCMAKE_VERBOSE_MAKEFILE=On -DCMAKE_USER_C_FLAGS="-m32" -DARTEMIS_CROSS_COMPILE=On -DARTEMIS_CROSS_COMPILE_ROOT_PATH=/usr/lib .```
## Lib AIO Documentation
The User Manual, chapter 38 (Libaio Native Libraries) will provide more details about our native libraries on libaio.

View File

@ -1,19 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
cmake .
make

View File

@ -1,131 +0,0 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<name>ActiveMQ Artemis Native POM</name>
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-pom</artifactId>
<version>2.7.0-SNAPSHOT</version>
</parent>
<artifactId>artemis-native</artifactId>
<packaging>bundle</packaging>
<dependencies>
<dependency>
<groupId>org.jboss.logging</groupId>
<artifactId>jboss-logging-processor</artifactId>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.jboss.logging</groupId>
<artifactId>jboss-logging-annotations</artifactId>
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.jboss.logging</groupId>
<artifactId>jboss-logging</artifactId>
</dependency>
<dependency>
<groupId>org.jboss.logmanager</groupId>
<artifactId>jboss-logmanager</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>${basedir}/target/output/</directory>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>${version.maven.jar.plugin}</version>
<configuration>
<archive>
<manifestEntries>
<Automatic-Module-Name>artemis.jni</Automatic-Module-Name>
</manifestEntries>
</archive>
</configuration>
</plugin>
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<executions>
<execution>
<id>copy-resources-32</id>
<phase>validate</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<outputDirectory>${basedir}/target/output/lib/linux-i686/</outputDirectory>
<resources>
<resource>
<directory>bin/</directory>
<includes>
<include>libartemis-native-32.so</include>
</includes>
</resource>
</resources>
</configuration>
</execution>
<execution>
<id>copy-resources-64</id>
<phase>validate</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<outputDirectory>${basedir}/target/output/lib/linux-x86_64/</outputDirectory>
<resources>
<resource>
<directory>bin/</directory>
<includes>
<include>libartemis-native-64.so</include>
</includes>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<properties>
<activemq.basedir>${project.basedir}/..</activemq.basedir>
</properties>
</project>

View File

@ -1,78 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
CMAKE_MINIMUM_REQUIRED(VERSION 2.6)
PROJECT(artemis-native)
SET(${PROJECT_NAME}_MAJOR_VERSION 1)
SET(${PROJECT_NAME}_MINOR_VERSION 0)
SET(${PROJECT_NAME}_PATCH_LEVEL 0)
FIND_PACKAGE(Java)
FIND_PACKAGE(JNI)
if (JNI_FOUND)
message (STATUS "JNI_INCLUDE_DIRS=${JNI_INCLUDE_DIRS}")
message (STATUS "JNI_LIBRARIES=${JNI_LIBRARIES}")
endif()
# You may want to adjust this next line for debugging. The -O3 is removed by default, since it would make debugging
# harder. Nonetheless, it can still be added by passing CMAKE_USER_C_FLAGS
# Also note that define the C99 as the minimum supported standard so the code can be compiled with older GCC versions
# (circa 4.4)
set(CMAKE_C_FLAGS_DEBUG "-Wall -std=c99 -z execstack -fdump-tree-all -Wall -pg -g ${CMAKE_USER_C_FLAGS}")
set(CMAKE_C_FLAGS "-O3 -std=c99 -Wall ${CMAKE_USER_C_FLAGS}")
set(ARTEMIS_LIB_NAME artemis-native-64)
if (CMAKE_SIZEOF_VOID_P EQUAL 4)
set(ARTEMIS_LIB_NAME artemis-native-32)
endif()
set(ARTEMIS_CROSS_COMPILE OFF CACHE BOOL "Cross-compile the native library")
if (ARTEMIS_CROSS_COMPILE)
if (CMAKE_SIZEOF_VOID_P EQUAL 4)
message(FATAL_ERROR "Cannot cross-compile to 32-bit architecture in a 32-bit architecture")
endif()
message(STATUS "Using cross-compilation")
set(ARTEMIS_CROSS_COMPILE_ROOT_PATH /usr/lib)
set(ARTEMIS_LIB_NAME artemis-native-32)
# The Cmake variable CMAKE_FIND_ROOT_PATH cannot be set via CLI, so we have to use a separate variable and then
# set it to that value. We use ARTEMIS_CROSS_COMPILE_ROOT_PATH for that.
set(CMAKE_FIND_ROOT_PATH ${ARTEMIS_CROSS_COMPILE_ROOT_PATH})
endif()
find_library(LIBAIO_LIB NAMES aio)
message(STATUS "Using the following libaio library for linking: ${LIBAIO_LIB}")
INCLUDE_DIRECTORIES(. ${JNI_INCLUDE_DIRS})
ADD_CUSTOM_COMMAND(
OUTPUT org_apache_activemq_artemis_jlibaio_LibaioContext.h
COMMAND javah -cp ../java/ org.apache.activemq.artemis.jlibaio.LibaioContext
DEPENDS ../java/org/apache/activemq/artemis/jlibaio/LibaioContext.java
)
ADD_LIBRARY(artemis-native SHARED org_apache_activemq_artemis_jlibaio_LibaioContext.c org_apache_activemq_artemis_jlibaio_LibaioContext.h exception_helper.h)
target_link_libraries(artemis-native ${LIBAIO_LIB})
set_target_properties(artemis-native PROPERTIES
LIBRARY_OUTPUT_DIRECTORY ../../../bin
LIBRARY_OUTPUT_NAME ${ARTEMIS_LIB_NAME})
message(STATUS "Setting up library as ${ARTEMIS_LIB_NAME} based on current architecture")

View File

@ -1,24 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
void throwRuntimeException(JNIEnv* env, char* message);
void throwRuntimeExceptionErrorNo(JNIEnv* env, char* message, int errorNumber);
void throwIOException(JNIEnv* env, char* message);
void throwIOExceptionErrorNo(JNIEnv* env, char* message, int errorNumber);
void throwClosedChannelException(JNIEnv* env);
void throwOutOfMemoryError(JNIEnv* env);
char* exceptionMessage(char* msg, int error);

View File

@ -1,919 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef _GNU_SOURCE
// libaio, O_DIRECT and other things won't be available without this define
#define _GNU_SOURCE
#endif
//#define DEBUG
#include <jni.h>
#include <unistd.h>
#include <errno.h>
#include <libaio.h>
#include <sys/types.h>
#include <sys/file.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <stdlib.h>
#include <pthread.h>
#include <limits.h>
#include "org_apache_activemq_artemis_jlibaio_LibaioContext.h"
#include "exception_helper.h"
struct io_control {
io_context_t ioContext;
struct io_event * events;
jobject thisObject;
// This is used to make sure we don't return IOCB while something else is using them
// this is to guarantee the submits could be done concurrently with polling
pthread_mutex_t iocbLock;
pthread_mutex_t pollLock;
// a reusable pool of iocb
struct iocb ** iocb;
int queueSize;
int iocbPut;
int iocbGet;
int used;
};
// We need a fast and reliable way to stop the blocked poller
// for that we need a dumb file,
// We are using a temporary file for this.
int dumbWriteHandler = 0;
char dumbPath[PATH_MAX];
#define ONE_MEGA 1048576l
void * oneMegaBuffer = 0;
pthread_mutex_t oneMegaMutex;
jclass submitClass = NULL;
jmethodID errorMethod = NULL;
jmethodID doneMethod = NULL;
jmethodID libaioContextDone = NULL;
jclass libaioContextClass = NULL;
jclass runtimeExceptionClass = NULL;
jclass ioExceptionClass = NULL;
// util methods
void throwRuntimeException(JNIEnv* env, char* message) {
(*env)->ThrowNew(env, runtimeExceptionClass, message);
}
void throwRuntimeExceptionErrorNo(JNIEnv* env, char* message, int errorNumber) {
char* allocatedMessage = exceptionMessage(message, errorNumber);
(*env)->ThrowNew(env, runtimeExceptionClass, allocatedMessage);
free(allocatedMessage);
}
void throwIOException(JNIEnv* env, char* message) {
(*env)->ThrowNew(env, ioExceptionClass, message);
}
void throwIOExceptionErrorNo(JNIEnv* env, char* message, int errorNumber) {
char* allocatedMessage = exceptionMessage(message, errorNumber);
(*env)->ThrowNew(env, ioExceptionClass, allocatedMessage);
free(allocatedMessage);
}
void throwOutOfMemoryError(JNIEnv* env) {
jclass exceptionClass = (*env)->FindClass(env, "java/lang/OutOfMemoryError");
(*env)->ThrowNew(env, exceptionClass, "");
}
/** Notice: every usage of exceptionMessage needs to release the allocated memory for the sequence of char */
char* exceptionMessage(char* msg, int error) {
if (error < 0) {
// some functions return negative values
// and it's hard to keep track of when to send -error and when not
// this will just take care when things are forgotten
// what would generate a proper error
error = error * -1;
}
//strerror is returning a constant, so no need to free anything coming from strerror
char *result = NULL;
if (asprintf(&result, "%s%s", msg, strerror(error)) == -1) {
fprintf(stderr, "Could not allocate enough memory for the error message: %s/%s\n", msg, strerror(error));
}
return result;
}
static inline short verifyBuffer(int alignment) {
pthread_mutex_lock(&oneMegaMutex);
if (oneMegaBuffer == 0) {
#ifdef DEBUG
fprintf (stdout, "oneMegaBuffer %ld\n", (long) oneMegaBuffer);
#endif
if (posix_memalign(&oneMegaBuffer, alignment, ONE_MEGA) != 0) {
fprintf(stderr, "Could not allocate the 1 Mega Buffer for initializing files\n");
pthread_mutex_unlock(&oneMegaMutex);
return -1;
}
memset(oneMegaBuffer, 0, ONE_MEGA);
}
pthread_mutex_unlock(&oneMegaMutex);
return 0;
}
jint JNI_OnLoad(JavaVM* vm, void* reserved) {
JNIEnv* env;
if ((*vm)->GetEnv(vm, (void**) &env, JNI_VERSION_1_6) != JNI_OK) {
return JNI_ERR;
} else {
int res = pthread_mutex_init(&oneMegaMutex, 0);
if (res) {
fprintf(stderr, "could not initialize mutex on on_load, %d", res);
return JNI_ERR;
}
sprintf (dumbPath, "%s/artemisJLHandler_XXXXXX", P_tmpdir);
dumbWriteHandler = mkstemp (dumbPath);
#ifdef DEBUG
fprintf (stdout, "Creating temp file %s for dumb writes\n", dumbPath);
fflush(stdout);
#endif
if (dumbWriteHandler < 0) {
fprintf (stderr, "couldn't create stop file handler %s\n", dumbPath);
return JNI_ERR;
}
//
// Accordingly to previous experiences we must hold Global Refs on Classes
// And
//
// Accordingly to IBM recommendations here:
// We don't need to hold a global reference on methods:
// http://www.ibm.com/developerworks/java/library/j-jni/index.html#notc
// Which actually caused core dumps
jclass localRuntimeExceptionClass = (*env)->FindClass(env, "java/lang/RuntimeException");
if (localRuntimeExceptionClass == NULL) {
// pending exception...
return JNI_ERR;
}
runtimeExceptionClass = (jclass) (*env)->NewGlobalRef(env, localRuntimeExceptionClass);
if (runtimeExceptionClass == NULL) {
// out-of-memory!
throwOutOfMemoryError(env);
return JNI_ERR;
}
jclass localIoExceptionClass = (*env)->FindClass(env, "java/io/IOException");
if (localIoExceptionClass == NULL) {
// pending exception...
return JNI_ERR;
}
ioExceptionClass = (jclass) (*env)->NewGlobalRef(env, localIoExceptionClass);
if (ioExceptionClass == NULL) {
// out-of-memory!
throwOutOfMemoryError(env);
return JNI_ERR;
}
submitClass = (*env)->FindClass(env, "org/apache/activemq/artemis/jlibaio/SubmitInfo");
if (submitClass == NULL) {
return JNI_ERR;
}
submitClass = (jclass)(*env)->NewGlobalRef(env, (jobject)submitClass);
errorMethod = (*env)->GetMethodID(env, submitClass, "onError", "(ILjava/lang/String;)V");
if (errorMethod == NULL) {
return JNI_ERR;
}
doneMethod = (*env)->GetMethodID(env, submitClass, "done", "()V");
if (doneMethod == NULL) {
return JNI_ERR;
}
libaioContextClass = (*env)->FindClass(env, "org/apache/activemq/artemis/jlibaio/LibaioContext");
if (libaioContextClass == NULL) {
return JNI_ERR;
}
libaioContextClass = (jclass)(*env)->NewGlobalRef(env, (jobject)libaioContextClass);
libaioContextDone = (*env)->GetMethodID(env, libaioContextClass, "done", "(Lorg/apache/activemq/artemis/jlibaio/SubmitInfo;)V");
if (libaioContextDone == NULL) {
return JNI_ERR;
}
return JNI_VERSION_1_6;
}
}
static inline void closeDumbHandlers() {
if (dumbWriteHandler != 0) {
#ifdef DEBUG
fprintf (stdout, "Closing and removing dump handler %s\n", dumbPath);
#endif
dumbWriteHandler = 0;
close(dumbWriteHandler);
unlink(dumbPath);
}
}
void JNI_OnUnload(JavaVM* vm, void* reserved) {
JNIEnv* env;
if ((*vm)->GetEnv(vm, (void**) &env, JNI_VERSION_1_6) != JNI_OK) {
// Something is wrong but nothing we can do about this :(
return;
} else {
closeDumbHandlers();
if (oneMegaBuffer != 0) {
free(oneMegaBuffer);
oneMegaBuffer = 0;
}
pthread_mutex_destroy(&oneMegaMutex);
// delete global references so the GC can collect them
if (runtimeExceptionClass != NULL) {
(*env)->DeleteGlobalRef(env, runtimeExceptionClass);
}
if (ioExceptionClass != NULL) {
(*env)->DeleteGlobalRef(env, ioExceptionClass);
}
if (submitClass != NULL) {
(*env)->DeleteGlobalRef(env, (jobject)submitClass);
}
if (libaioContextClass != NULL) {
(*env)->DeleteGlobalRef(env, (jobject)libaioContextClass);
}
}
}
JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_shutdownHook
(JNIEnv * env, jclass clazz) {
closeDumbHandlers();
}
static inline struct io_control * getIOControl(JNIEnv* env, jobject pointer) {
struct io_control * ioControl = (struct io_control *) (*env)->GetDirectBufferAddress(env, pointer);
if (ioControl == NULL) {
throwRuntimeException(env, "Controller not initialized");
}
return ioControl;
}
/**
* remove an iocb from the pool of IOCBs. Returns null if full
*/
static inline struct iocb * getIOCB(struct io_control * control) {
struct iocb * iocb = 0;
pthread_mutex_lock(&(control->iocbLock));
#ifdef DEBUG
fprintf (stdout, "getIOCB::used=%d, queueSize=%d, get=%d, put=%d\n", control->used, control->queueSize, control->iocbGet, control->iocbPut);
#endif
if (control->used < control->queueSize) {
control->used++;
iocb = control->iocb[control->iocbGet++];
if (control->iocbGet >= control->queueSize) {
control->iocbGet = 0;
}
}
pthread_mutex_unlock(&(control->iocbLock));
return iocb;
}
/**
* Put an iocb back on the pool of IOCBs
*/
static inline void putIOCB(struct io_control * control, struct iocb * iocbBack) {
pthread_mutex_lock(&(control->iocbLock));
#ifdef DEBUG
fprintf (stdout, "putIOCB::used=%d, queueSize=%d, get=%d, put=%d\n", control->used, control->queueSize, control->iocbGet, control->iocbPut);
#endif
control->used--;
control->iocb[control->iocbPut++] = iocbBack;
if (control->iocbPut >= control->queueSize) {
control->iocbPut = 0;
}
pthread_mutex_unlock(&(control->iocbLock));
}
static inline short submit(JNIEnv * env, struct io_control * theControl, struct iocb * iocb) {
int result = io_submit(theControl->ioContext, 1, &iocb);
if (result < 0) {
// Putting the Global Ref and IOCB back in case of a failure
if (iocb->data != NULL && iocb->data != (void *) -1) {
(*env)->DeleteGlobalRef(env, (jobject)iocb->data);
}
putIOCB(theControl, iocb);
throwIOExceptionErrorNo(env, "Error while submitting IO: ", -result);
return 0;
}
return 1;
}
static inline void * getBuffer(JNIEnv* env, jobject pointer) {
return (*env)->GetDirectBufferAddress(env, pointer);
}
JNIEXPORT jboolean JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_lock
(JNIEnv * env, jclass clazz, jint handle) {
return flock(handle, LOCK_EX | LOCK_NB) == 0;
}
/**
* Destroys the individual members of the IOCB pool
* @param theControl the IO Control structure containing an IOCB pool
* @param upperBound the number of elements contained within the pool
*/
static inline void iocb_destroy_members(struct io_control * theControl, int upperBound) {
for (int i = 0; i < upperBound; i++) {
free(theControl->iocb[i]);
}
}
/**
* Destroys an IOCB pool and its members up to a certain limit. Should be used when the IOCB
* pool fails to initialize completely
* @param theControl the IO Control structure containing an IOCB pool
* @param upperBound the number of elements contained within the pool
*/
static inline void iocb_destroy_bounded(struct io_control * theControl, int upperBound) {
iocb_destroy_members(theControl, upperBound);
free(theControl->iocb);
}
/**
* Destroys an IOCB pool and all its members
* @param theControl
*/
static inline void iocb_destroy(struct io_control * theControl) {
iocb_destroy_bounded(theControl, theControl->queueSize);
}
/**
* Everything that is allocated here will be freed at deleteContext when the class is unloaded.
*/
JNIEXPORT jobject JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_newContext(JNIEnv* env, jobject thisObject, jint queueSize) {
int i = 0;
#ifdef DEBUG
fprintf (stdout, "Initializing context\n");
#endif
struct io_control * theControl = (struct io_control *) malloc(sizeof(struct io_control));
if (theControl == NULL) {
throwOutOfMemoryError(env);
return NULL;
}
int res = io_queue_init(queueSize, &theControl->ioContext);
if (res) {
// Error, so need to release whatever was done before
io_queue_release(theControl->ioContext);
free(theControl);
throwRuntimeExceptionErrorNo(env, "Cannot initialize queue:", res);
return NULL;
}
theControl->iocb = (struct iocb **)malloc((sizeof(struct iocb *) * (size_t)queueSize));
if (theControl->iocb == NULL) {
io_queue_release(theControl->ioContext);
free(theControl);
throwOutOfMemoryError(env);
return NULL;
}
for (i = 0; i < queueSize; i++) {
theControl->iocb[i] = (struct iocb *)malloc(sizeof(struct iocb));
if (theControl->iocb[i] == NULL) {
// It may not have been fully initialized, therefore limit the cleanup up to 'i' members.
iocb_destroy_bounded(theControl, i);
io_queue_release(theControl->ioContext);
free(theControl);
throwOutOfMemoryError(env);
return NULL;
}
}
theControl->queueSize = queueSize;
res = pthread_mutex_init(&(theControl->iocbLock), 0);
if (res) {
iocb_destroy(theControl);
io_queue_release(theControl->ioContext);
free(theControl);
throwRuntimeExceptionErrorNo(env, "Can't initialize mutext:", res);
return NULL;
}
res = pthread_mutex_init(&(theControl->pollLock), 0);
if (res) {
iocb_destroy(theControl);
io_queue_release(theControl->ioContext);
free(theControl);
throwRuntimeExceptionErrorNo(env, "Can't initialize mutext:", res);
return NULL;
}
theControl->events = (struct io_event *)malloc(sizeof(struct io_event) * (size_t)queueSize);
if (theControl->events == NULL) {
iocb_destroy(theControl);
io_queue_release(theControl->ioContext);
free(theControl);
throwRuntimeExceptionErrorNo(env, "Can't initialize mutext (not enough memory for the events member): ", res);
return NULL;
}
theControl->iocbPut = 0;
theControl->iocbGet = 0;
theControl->used = 0;
theControl->thisObject = (*env)->NewGlobalRef(env, thisObject);
return (*env)->NewDirectByteBuffer(env, theControl, sizeof(struct io_control));
}
JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_deleteContext(JNIEnv* env, jclass clazz, jobject contextPointer) {
int i;
struct io_control * theControl = getIOControl(env, contextPointer);
if (theControl == NULL) {
return;
}
struct iocb * iocb = getIOCB(theControl);
if (iocb == NULL) {
throwIOException(env, "Not enough space in libaio queue");
return;
}
// Submitting a dumb write so the loop finishes
io_prep_pwrite(iocb, dumbWriteHandler, 0, 0, 0);
iocb->data = (void *) -1;
if (!submit(env, theControl, iocb)) {
return;
}
// to make sure the poll has finished
pthread_mutex_lock(&(theControl->pollLock));
pthread_mutex_unlock(&(theControl->pollLock));
// To return any pending IOCBs
int result = io_getevents(theControl->ioContext, 0, 1, theControl->events, 0);
for (i = 0; i < result; i++) {
struct io_event * event = &(theControl->events[i]);
struct iocb * iocbp = event->obj;
putIOCB(theControl, iocbp);
}
io_queue_release(theControl->ioContext);
pthread_mutex_destroy(&(theControl->pollLock));
pthread_mutex_destroy(&(theControl->iocbLock));
iocb_destroy(theControl);
(*env)->DeleteGlobalRef(env, theControl->thisObject);
free(theControl->events);
free(theControl);
}
JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_close(JNIEnv* env, jclass clazz, jint fd) {
if (close(fd) < 0) {
throwIOExceptionErrorNo(env, "Error closing file:", errno);
}
}
JNIEXPORT int JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_open(JNIEnv* env, jclass clazz,
jstring path, jboolean direct) {
const char* f_path = (*env)->GetStringUTFChars(env, path, 0);
int res;
if (direct) {
res = open(f_path, O_RDWR | O_CREAT | O_DIRECT, 0666);
} else {
res = open(f_path, O_RDWR | O_CREAT, 0666);
}
(*env)->ReleaseStringUTFChars(env, path, f_path);
if (res < 0) {
throwIOExceptionErrorNo(env, "Cannot open file:", errno);
}
return res;
}
JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_submitWrite
(JNIEnv * env, jclass clazz, jint fileHandle, jobject contextPointer, jlong position, jint size, jobject bufferWrite, jobject callback) {
struct io_control * theControl = getIOControl(env, contextPointer);
if (theControl == NULL) {
return;
}
#ifdef DEBUG
fprintf (stdout, "submitWrite position %ld, size %d\n", position, size);
#endif
struct iocb * iocb = getIOCB(theControl);
if (iocb == NULL) {
throwIOException(env, "Not enough space in libaio queue");
return;
}
io_prep_pwrite(iocb, fileHandle, getBuffer(env, bufferWrite), (size_t)size, position);
// The GlobalRef will be deleted when poll is called. this is done so
// the vm wouldn't crash if the Callback passed by the user is GCed between submission
// and callback.
// also as the real intention is to hold the reference until the life cycle is complete
iocb->data = (void *) (*env)->NewGlobalRef(env, callback);
submit(env, theControl, iocb);
}
JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_submitRead
(JNIEnv * env, jclass clazz, jint fileHandle, jobject contextPointer, jlong position, jint size, jobject bufferRead, jobject callback) {
struct io_control * theControl = getIOControl(env, contextPointer);
if (theControl == NULL) {
return;
}
struct iocb * iocb = getIOCB(theControl);
if (iocb == NULL) {
throwIOException(env, "Not enough space in libaio queue");
return;
}
io_prep_pread(iocb, fileHandle, getBuffer(env, bufferRead), (size_t)size, position);
// The GlobalRef will be deleted when poll is called. this is done so
// the vm wouldn't crash if the Callback passed by the user is GCed between submission
// and callback.
// also as the real intention is to hold the reference until the life cycle is complete
iocb->data = (void *) (*env)->NewGlobalRef(env, callback);
submit(env, theControl, iocb);
}
JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_blockedPoll
(JNIEnv * env, jobject thisObject, jobject contextPointer, jboolean useFdatasync) {
#ifdef DEBUG
fprintf (stdout, "Running blockedPoll\n");
fflush(stdout);
#endif
int i;
struct io_control * theControl = getIOControl(env, contextPointer);
if (theControl == NULL) {
return;
}
int max = theControl->queueSize;
pthread_mutex_lock(&(theControl->pollLock));
short running = 1;
int lastFile = -1;
while (running) {
int result = io_getevents(theControl->ioContext, 1, max, theControl->events, 0);
if (result == -EINTR)
{
// ARTEMIS-353: jmap will issue some weird interrupt signal what would break the execution here
// we need to ignore such calls here
continue;
}
if (result < 0)
{
throwIOExceptionErrorNo(env, "Error while calling io_getevents IO: ", -result);
break;
}
#ifdef DEBUG
fprintf (stdout, "blockedPoll returned %d events\n", result);
fflush(stdout);
#endif
lastFile = -1;
for (i = 0; i < result; i++)
{
#ifdef DEBUG
fprintf (stdout, "blockedPoll treating event %d\n", i);
fflush(stdout);
#endif
struct io_event * event = &(theControl->events[i]);
struct iocb * iocbp = event->obj;
if (iocbp->aio_fildes == dumbWriteHandler) {
#ifdef DEBUG
fprintf (stdout, "Dumb write arrived, giving up the loop\n");
fflush(stdout);
#endif
putIOCB(theControl, iocbp);
running = 0;
break;
}
if (useFdatasync && lastFile != iocbp->aio_fildes) {
lastFile = iocbp->aio_fildes;
fdatasync(lastFile);
}
int eventResult = (int)event->res;
#ifdef DEBUG
fprintf (stdout, "Poll res: %d totalRes=%d\n", eventResult, result);
fflush (stdout);
#endif
if (eventResult < 0) {
#ifdef DEBUG
fprintf (stdout, "Error: %s\n", strerror(-eventResult));
fflush (stdout);
#endif
jstring jstrError = (*env)->NewStringUTF(env, strerror(-eventResult));
if (iocbp->data != NULL) {
(*env)->CallVoidMethod(env, (jobject)(iocbp->data), errorMethod, (jint)(-eventResult), jstrError);
}
}
jobject obj = (jobject)iocbp->data;
putIOCB(theControl, iocbp);
if (obj != NULL) {
(*env)->CallVoidMethod(env, theControl->thisObject, libaioContextDone,obj);
// We delete the globalRef after the completion of the callback
(*env)->DeleteGlobalRef(env, obj);
}
}
}
pthread_mutex_unlock(&(theControl->pollLock));
}
JNIEXPORT jint JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_poll
(JNIEnv * env, jobject obj, jobject contextPointer, jobjectArray callbacks, jint min, jint max) {
int i = 0;
struct io_control * theControl = getIOControl(env, contextPointer);
if (theControl == NULL) {
return 0;
}
int result = io_getevents(theControl->ioContext, min, max, theControl->events, 0);
int retVal = result;
for (i = 0; i < result; i++) {
struct io_event * event = &(theControl->events[i]);
struct iocb * iocbp = event->obj;
int eventResult = (int)event->res;
#ifdef DEBUG
fprintf (stdout, "Poll res: %d totalRes=%d\n", eventResult, result);
#endif
if (eventResult < 0) {
#ifdef DEBUG
fprintf (stdout, "Error: %s\n", strerror(-eventResult));
#endif
if (iocbp->data != NULL && iocbp->data != (void *) -1) {
jstring jstrError = (*env)->NewStringUTF(env, strerror(-eventResult));
(*env)->CallVoidMethod(env, (jobject)(iocbp->data), errorMethod, (jint)(-eventResult), jstrError);
}
}
if (iocbp->data != NULL && iocbp->data != (void *) -1) {
(*env)->SetObjectArrayElement(env, callbacks, i, (jobject)iocbp->data);
// We delete the globalRef after the completion of the callback
(*env)->DeleteGlobalRef(env, (jobject)iocbp->data);
}
putIOCB(theControl, iocbp);
}
return retVal;
}
JNIEXPORT jobject JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_newAlignedBuffer
(JNIEnv * env, jclass clazz, jint size, jint alignment) {
if (size % alignment != 0) {
throwRuntimeException(env, "Buffer size needs to be aligned to passed argument");
return NULL;
}
// This will allocate a buffer, aligned by alignment.
// Buffers created here need to be manually destroyed by destroyBuffer, or this would leak on the process heap away of Java's GC managed memory
// NOTE: this buffer will contain non initialized data, you must fill it up properly
void * buffer;
int result = posix_memalign(&buffer, (size_t)alignment, (size_t)size);
if (result) {
throwRuntimeExceptionErrorNo(env, "Can't allocate posix buffer:", result);
return NULL;
}
memset(buffer, 0, (size_t)size);
return (*env)->NewDirectByteBuffer(env, buffer, size);
}
JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_freeBuffer
(JNIEnv * env, jclass clazz, jobject jbuffer) {
if (jbuffer == NULL)
{
throwRuntimeException(env, "Null pointer");
return;
}
void * buffer = (*env)->GetDirectBufferAddress(env, jbuffer);
free(buffer);
}
/** It does nothing... just return true to make sure it has all the binary dependencies */
JNIEXPORT jint JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_getNativeVersion
(JNIEnv * env, jclass clazz)
{
return org_apache_activemq_artemis_jlibaio_LibaioContext_EXPECTED_NATIVE_VERSION;
}
JNIEXPORT jlong JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_getSize
(JNIEnv * env, jclass clazz, jint fd)
{
struct stat statBuffer;
if (fstat(fd, &statBuffer) < 0)
{
throwIOExceptionErrorNo(env, "Cannot determine file size:", errno);
return -1l;
}
return statBuffer.st_size;
}
JNIEXPORT jint JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_getBlockSizeFD
(JNIEnv * env, jclass clazz, jint fd)
{
struct stat statBuffer;
if (fstat(fd, &statBuffer) < 0)
{
throwIOExceptionErrorNo(env, "Cannot determine file size:", errno);
return -1l;
}
return statBuffer.st_blksize;
}
JNIEXPORT jint JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_getBlockSize
(JNIEnv * env, jclass clazz, jstring path)
{
const char* f_path = (*env)->GetStringUTFChars(env, path, 0);
struct stat statBuffer;
if (stat(f_path, &statBuffer) < 0)
{
throwIOExceptionErrorNo(env, "Cannot determine file size:", errno);
return -1l;
}
(*env)->ReleaseStringUTFChars(env, path, f_path);
return statBuffer.st_blksize;
}
JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_fallocate
(JNIEnv * env, jclass clazz, jint fd, jlong size)
{
if (fallocate(fd, 0, 0, (off_t) size) < 0)
{
throwIOExceptionErrorNo(env, "Could not preallocate file", errno);
}
fsync(fd);
lseek (fd, 0, SEEK_SET);
}
JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_fill
(JNIEnv * env, jclass clazz, jint fd, jint alignment, jlong size)
{
int i;
int blocks = size / ONE_MEGA;
int rest = size % ONE_MEGA;
#ifdef DEBUG
fprintf (stdout, "calling fill ... blocks = %d, rest=%d, alignment=%d\n", blocks, rest, alignment);
#endif
verifyBuffer(alignment);
lseek (fd, 0, SEEK_SET);
for (i = 0; i < blocks; i++)
{
if (write(fd, oneMegaBuffer, ONE_MEGA) < 0)
{
#ifdef DEBUG
fprintf (stdout, "Errno is %d\n", errno);
#endif
throwIOException(env, "Cannot initialize file");
return;
}
}
if (rest != 0l)
{
if (write(fd, oneMegaBuffer, rest) < 0)
{
#ifdef DEBUG
fprintf (stdout, "Errno is %d\n", errno);
#endif
throwIOException(env, "Cannot initialize file with final rest");
return;
}
}
lseek (fd, 0, SEEK_SET);
}
JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_memsetBuffer
(JNIEnv *env, jclass clazz, jobject jbuffer, jint size)
{
#ifdef DEBUG
fprintf (stdout, "Mem setting buffer with %d bytes\n", size);
#endif
void * buffer = (*env)->GetDirectBufferAddress(env, jbuffer);
if (buffer == 0)
{
throwRuntimeException(env, "Invalid Buffer used, libaio requires NativeBuffer instead of Java ByteBuffer");
return;
}
memset(buffer, 0, (size_t)size);
}

View File

@ -1,466 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jlibaio;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
* This class is used as an aggregator for the {@link LibaioFile}.
* <br>
* It holds native data, and it will share a libaio queue that can be used by multiple files.
* <br>
* You need to use the poll methods to read the result of write and read submissions.
* <br>
* You also need to use the special buffer created by {@link LibaioFile} as you need special alignments
* when dealing with O_DIRECT files.
* <br>
* A Single controller can server multiple files. There's no need to create one controller per file.
* <br>
* <a href="https://ext4.wiki.kernel.org/index.php/Clarifying_Direct_IO's_Semantics">Interesting reading for this.</a>
*/
public class LibaioContext<Callback extends SubmitInfo> implements Closeable {
private static final AtomicLong totalMaxIO = new AtomicLong(0);
/**
* This definition needs to match Version.h on the native sources.
* <br>
* Or else the native module won't be loaded because of version mismatches
*/
private static final int EXPECTED_NATIVE_VERSION = 8;
private static boolean loaded = false;
private static final AtomicBoolean shuttingDown = new AtomicBoolean(false);
private static final AtomicInteger contexts = new AtomicInteger(0);
public static boolean isLoaded() {
return loaded;
}
private static boolean loadLibrary(final String name) {
try {
System.loadLibrary(name);
if (getNativeVersion() != EXPECTED_NATIVE_VERSION) {
NativeLogger.LOGGER.incompatibleNativeLibrary();
return false;
} else {
return true;
}
} catch (Throwable e) {
NativeLogger.LOGGER.debug(name + " -> error loading the native library", e);
return false;
}
}
static {
String[] libraries = new String[]{"artemis-native-64", "artemis-native-32"};
for (String library : libraries) {
if (loadLibrary(library)) {
loaded = true;
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
shuttingDown.set(true);
checkShutdown();
}
});
break;
} else {
NativeLogger.LOGGER.debug("Library " + library + " not found!");
}
}
if (!loaded) {
NativeLogger.LOGGER.debug("Couldn't locate LibAIO Wrapper");
}
}
private static void checkShutdown() {
if (contexts.get() == 0 && shuttingDown.get()) {
shutdownHook();
}
}
private static native void shutdownHook();
/**
* This is used to validate leaks on tests.
*
* @return the number of allocated aio, to be used on test checks.
*/
public static long getTotalMaxIO() {
return totalMaxIO.get();
}
/**
* It will reset all the positions on the buffer to 0, using memset.
*
* @param buffer a native buffer.
* s
*/
public void memsetBuffer(ByteBuffer buffer) {
memsetBuffer(buffer, buffer.limit());
}
/**
* This is used on tests validating for leaks.
*/
public static void resetMaxAIO() {
totalMaxIO.set(0);
}
/**
* the native ioContext including the structure created.
*/
private final ByteBuffer ioContext;
private final AtomicBoolean closed = new AtomicBoolean(false);
final Semaphore ioSpace;
final int queueSize;
final boolean useFdatasync;
/**
* The queue size here will use resources defined on the kernel parameter
* <a href="https://www.kernel.org/doc/Documentation/sysctl/fs.txt">fs.aio-max-nr</a> .
*
* @param queueSize the size to be initialize on libaio
* io_queue_init which can't be higher than /proc/sys/fs/aio-max-nr.
* @param useSemaphore should block on a semaphore avoiding using more submits than what's available.
* @param useFdatasync should use fdatasync before calling callbacks.
*/
public LibaioContext(int queueSize, boolean useSemaphore, boolean useFdatasync) {
try {
contexts.incrementAndGet();
this.ioContext = newContext(queueSize);
this.useFdatasync = useFdatasync;
} catch (Exception e) {
throw e;
}
this.queueSize = queueSize;
totalMaxIO.addAndGet(queueSize);
if (useSemaphore) {
this.ioSpace = new Semaphore(queueSize);
} else {
this.ioSpace = null;
}
}
/**
* Documented at {@link LibaioFile#write(long, int, java.nio.ByteBuffer, SubmitInfo)}
*
* @param fd the file descriptor
* @param position the write position
* @param size number of bytes to use
* @param bufferWrite the native buffer
* @param callback a callback
* @throws IOException in case of error
*/
public void submitWrite(int fd,
long position,
int size,
ByteBuffer bufferWrite,
Callback callback) throws IOException {
if (closed.get()) {
throw new IOException("Libaio Context is closed!");
}
try {
if (ioSpace != null) {
ioSpace.acquire();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e.getMessage(), e);
}
submitWrite(fd, this.ioContext, position, size, bufferWrite, callback);
}
public void submitRead(int fd,
long position,
int size,
ByteBuffer bufferWrite,
Callback callback) throws IOException {
if (closed.get()) {
throw new IOException("Libaio Context is closed!");
}
try {
if (ioSpace != null) {
ioSpace.acquire();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e.getMessage(), e);
}
submitRead(fd, this.ioContext, position, size, bufferWrite, callback);
}
/**
* This is used to close the libaio queues and cleanup the native data used.
* <br>
* It is unsafe to close the controller while you have pending writes or files open as
* this could cause core dumps or VM crashes.
*/
@Override
public void close() {
if (!closed.getAndSet(true)) {
if (ioSpace != null) {
try {
ioSpace.tryAcquire(queueSize, 10, TimeUnit.SECONDS);
} catch (Exception e) {
NativeLogger.LOGGER.error(e);
}
}
totalMaxIO.addAndGet(-queueSize);
if (ioContext != null) {
deleteContext(ioContext);
}
contexts.decrementAndGet();
checkShutdown();
}
}
@Override
protected void finalize() throws Throwable {
super.finalize();
close();
}
/**
* It will open a file. If you set the direct flag = false then you won't need to use the special buffer.
* Notice: This will create an empty file if the file doesn't already exist.
*
* @param file the file to be open.
* @param direct will set ODIRECT.
* @return It will return a LibaioFile instance.
* @throws IOException in case of error.
*/
public LibaioFile<Callback> openFile(File file, boolean direct) throws IOException {
return openFile(file.getPath(), direct);
}
/**
* It will open a file. If you set the direct flag = false then you won't need to use the special buffer.
* Notice: This will create an empty file if the file doesn't already exist.
*
* @param file the file to be open.
* @param direct should use O_DIRECT when opening the file.
* @return a new open file.
* @throws IOException in case of error.
*/
public LibaioFile<Callback> openFile(String file, boolean direct) throws IOException {
checkNotNull(file, "path");
checkNotNull(ioContext, "IOContext");
// note: the native layer will throw an IOException in case of errors
int res = LibaioContext.open(file, direct);
return new LibaioFile<>(res, this);
}
/**
* It will open a file disassociated with any sort of factory.
* This is useful when you won't use reading / writing through libaio like locking files.
*
* @param file a file name
* @param direct will use O_DIRECT
* @return a new file
* @throws IOException in case of error.
*/
public static LibaioFile openControlFile(String file, boolean direct) throws IOException {
checkNotNull(file, "path");
// note: the native layer will throw an IOException in case of errors
int res = LibaioContext.open(file, direct);
return new LibaioFile<>(res, null);
}
/**
* Checks that the given argument is not null. If it is, throws {@link NullPointerException}.
* Otherwise, returns the argument.
*/
private static <T> T checkNotNull(T arg, String text) {
if (arg == null) {
throw new NullPointerException(text);
}
return arg;
}
/**
* It will poll the libaio queue for results. It should block until min is reached
* Results are placed on the callback.
* <br>
* This shouldn't be called concurrently. You should provide your own synchronization if you need more than one
* Thread polling for any reason.
* <br>
* Notice that the native layer will invoke {@link SubmitInfo#onError(int, String)} in case of failures,
* but it won't call done method for you.
*
* @param callbacks area to receive the callbacks passed on submission.The size of this callback has to
* be greater than the parameter max.
* @param min the minimum number of elements to receive. It will block until this is achieved.
* @param max The maximum number of elements to receive.
* @return Number of callbacks returned.
* @see LibaioFile#write(long, int, java.nio.ByteBuffer, SubmitInfo)
* @see LibaioFile#read(long, int, java.nio.ByteBuffer, SubmitInfo)
*/
public int poll(Callback[] callbacks, int min, int max) {
int released = poll(ioContext, callbacks, min, max);
if (ioSpace != null) {
if (released > 0) {
ioSpace.release(released);
}
}
return released;
}
/**
* It will start polling and will keep doing until the context is closed.
* This will call callbacks on {@link SubmitInfo#onError(int, String)} and
* {@link SubmitInfo#done()}.
* In case of error, both {@link SubmitInfo#onError(int, String)} and
* {@link SubmitInfo#done()} are called.
*/
public void poll() {
if (!closed.get()) {
blockedPoll(ioContext, useFdatasync);
}
}
/**
* Called from the native layer
*/
private void done(SubmitInfo info) {
info.done();
if (ioSpace != null) {
ioSpace.release();
}
}
/**
* This is the queue for libaio, initialized with queueSize.
*/
private native ByteBuffer newContext(int queueSize);
/**
* Internal method to be used when closing the controller.
*/
private native void deleteContext(ByteBuffer buffer);
/**
* it will return a file descriptor.
*
* @param path the file name.
* @param direct translates as O_DIRECT On open
* @return a fd from open C call.
*/
public static native int open(String path, boolean direct);
public static native void close(int fd);
/**
*/
/**
* Buffers for O_DIRECT need to use posix_memalign.
* <br>
* Documented at {@link LibaioFile#newBuffer(int)}.
*
* @param size needs to be % alignment
* @param alignment the alignment used at the dispositive
* @return a new native buffer used with posix_memalign
*/
public static native ByteBuffer newAlignedBuffer(int size, int alignment);
/**
* This will call posix free to release the inner buffer allocated at {@link #newAlignedBuffer(int, int)}.
*
* @param buffer a native buffer allocated with {@link #newAlignedBuffer(int, int)}.
*/
public static native void freeBuffer(ByteBuffer buffer);
/**
* Documented at {@link LibaioFile#write(long, int, java.nio.ByteBuffer, SubmitInfo)}.
*/
native void submitWrite(int fd,
ByteBuffer libaioContext,
long position,
int size,
ByteBuffer bufferWrite,
Callback callback) throws IOException;
/**
* Documented at {@link LibaioFile#read(long, int, java.nio.ByteBuffer, SubmitInfo)}.
*/
native void submitRead(int fd,
ByteBuffer libaioContext,
long position,
int size,
ByteBuffer bufferWrite,
Callback callback) throws IOException;
/**
* Note: this shouldn't be done concurrently.
* This method will block until the min condition is satisfied on the poll.
* <p/>
* The callbacks will include the original callback sent at submit (read or write).
*/
native int poll(ByteBuffer libaioContext, Callback[] callbacks, int min, int max);
/**
* This method will block as long as the context is open.
*/
native void blockedPoll(ByteBuffer libaioContext, boolean useFdatasync);
static native int getNativeVersion();
public static native boolean lock(int fd);
public static native void memsetBuffer(ByteBuffer buffer, int size);
static native long getSize(int fd);
static native int getBlockSizeFD(int fd);
public static int getBlockSize(File path) {
return getBlockSize(path.getAbsolutePath());
}
public static native int getBlockSize(String path);
static native void fallocate(int fd, long size);
static native void fill(int fd, int alignment, long size);
static native void writeInternal(int fd, long position, long size, ByteBuffer bufferWrite) throws IOException;
}

View File

@ -1,137 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jlibaio;
import java.io.IOException;
import java.nio.ByteBuffer;
/**
* This is an extension to use libaio.
*/
public final class LibaioFile<Callback extends SubmitInfo> implements AutoCloseable {
protected boolean open;
/**
* This represents a structure allocated on the native
* this is a io_context_t
*/
final LibaioContext<Callback> ctx;
private int fd;
LibaioFile(int fd, LibaioContext ctx) {
this.ctx = ctx;
this.fd = fd;
}
public int getBlockSize() {
return LibaioContext.getBlockSizeFD(fd);
}
public boolean lock() {
return LibaioContext.lock(fd);
}
@Override
public void close() throws IOException {
open = false;
LibaioContext.close(fd);
}
/**
* @return The size of the file.
*/
public long getSize() {
return LibaioContext.getSize(fd);
}
/**
* It will submit a write to the queue. The callback sent here will be received on the
* {@link LibaioContext#poll(SubmitInfo[], int, int)}
* In case of the libaio queue is full (e.g. returning E_AGAIN) this method will return false.
* <br>
* Notice: this won't hold a global reference on buffer, callback should hold a reference towards bufferWrite.
* And don't free the buffer until the callback was called as this could crash the VM.
*
* @param position The position on the file to write. Notice this has to be a multiple of 512.
* @param size The size of the buffer to use while writing.
* @param buffer if you are using O_DIRECT the buffer here needs to be allocated by {@link #newBuffer(int)}.
* @param callback A callback to be returned on the poll method.
* @throws java.io.IOException in case of error
*/
public void write(long position, int size, ByteBuffer buffer, Callback callback) throws IOException {
ctx.submitWrite(fd, position, size, buffer, callback);
}
/**
* It will submit a read to the queue. The callback sent here will be received on the
* {@link LibaioContext#poll(SubmitInfo[], int, int)}.
* In case of the libaio queue is full (e.g. returning E_AGAIN) this method will return false.
* <br>
* Notice: this won't hold a global reference on buffer, callback should hold a reference towards bufferWrite.
* And don't free the buffer until the callback was called as this could crash the VM.
* *
*
* @param position The position on the file to read. Notice this has to be a multiple of 512.
* @param size The size of the buffer to use while reading.
* @param buffer if you are using O_DIRECT the buffer here needs to be allocated by {@link #newBuffer(int)}.
* @param callback A callback to be returned on the poll method.
* @throws java.io.IOException in case of error
* @see LibaioContext#poll(SubmitInfo[], int, int)
*/
public void read(long position, int size, ByteBuffer buffer, Callback callback) throws IOException {
ctx.submitRead(fd, position, size, buffer, callback);
}
/**
* It will allocate a buffer to be used on libaio operations.
* Buffers here are allocated with posix_memalign.
* <br>
* You need to explicitly free the buffer created from here using the
* {@link LibaioContext#freeBuffer(java.nio.ByteBuffer)}.
*
* @param size the size of the buffer.
* @return the buffer allocated.
*/
public ByteBuffer newBuffer(int size) {
return LibaioContext.newAlignedBuffer(size, 4 * 1024);
}
/**
* It will preallocate the file with a given size.
*
* @param size number of bytes to be filled on the file
*/
public void fill(int alignment, long size) {
try {
LibaioContext.fill(fd, alignment, size);
} catch (OutOfMemoryError e) {
NativeLogger.LOGGER.debug("Didn't have enough memory to allocate " + size + " bytes in memory, using simple fallocate");
LibaioContext.fallocate(fd, size);
}
}
/**
* It will use fallocate to initialize a file.
*
* @param size number of bytes to be filled on the file
*/
public void fallocate(long size) {
LibaioContext.fallocate(fd, size);
}
}

View File

@ -1,50 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jlibaio;
import org.jboss.logging.BasicLogger;
import org.jboss.logging.Logger;
import org.jboss.logging.annotations.LogMessage;
import org.jboss.logging.annotations.Message;
import org.jboss.logging.annotations.MessageLogger;
/**
* Logger Code 16
*
* each message id must be 6 digits long starting with 16, the 3rd digit denotes the level so
*
* INF0 1
* WARN 2
* DEBUG 3
* ERROR 4
* TRACE 5
* FATAL 6
*
* so an INFO message would be 161000 to 161999
*/
@MessageLogger(projectCode = "jlibaio")
public interface NativeLogger extends BasicLogger {
/**
* The journal logger.
*/
NativeLogger LOGGER = Logger.getMessageLogger(NativeLogger.class, NativeLogger.class.getPackage().getName());
@LogMessage(level = Logger.Level.WARN)
@Message(id = 163001, value = "You have a native library with a different version than expected", format = Message.Format.MESSAGE_FORMAT)
void incompatibleNativeLibrary();
}

View File

@ -1,25 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jlibaio;
public interface SubmitInfo {
void onError(int errno, String message);
void done();
}

View File

@ -1,24 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* This packages handles Linux libaio at a low level.
* <br>
* Buffers needs to be specially allocated by {@link org.apache.activemq.artemis.jlibaio.LibaioContext#newAlignedBuffer(int, int)}
* as they need to be aligned to 512 or 4096 when using Direct files.
*/
package org.apache.activemq.artemis.jlibaio;

View File

@ -1,76 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jlibaio.util;
import org.apache.activemq.artemis.jlibaio.SubmitInfo;
/**
* this is an utility class where you can reuse Callback objects for your LibaioContext usage.
*/
public class CallbackCache<Callback extends SubmitInfo> {
private final SubmitInfo[] pool;
private int put = 0;
private int get = 0;
private int available = 0;
private final int size;
private final Object lock = new Object();
public CallbackCache(int size) {
this.pool = new SubmitInfo[size];
this.size = size;
}
public Callback get() {
synchronized (lock) {
if (available <= 0) {
return null;
} else {
Callback retValue = (Callback) pool[get];
pool[get] = null;
if (retValue == null) {
throw new NullPointerException("You should initialize the pool before using it");
}
available--;
get++;
if (get >= size) {
get = 0;
}
return retValue;
}
}
}
public CallbackCache put(Callback callback) {
if (callback == null) {
return null;
}
synchronized (lock) {
if (available < size) {
available++;
pool[put++] = callback;
if (put >= size) {
put = 0;
}
}
}
return this;
}
}

View File

@ -1,98 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jlibaio.test;
import java.util.HashSet;
import org.apache.activemq.artemis.jlibaio.SubmitInfo;
import org.apache.activemq.artemis.jlibaio.util.CallbackCache;
import org.junit.Assert;
import org.junit.Test;
public class CallbackCachelTest {
@Test
public void testPartiallyInitialized() {
CallbackCache<MyPool> pool = new CallbackCache(100);
for (int i = 0; i < 50; i++) {
pool.put(new MyPool(i));
}
MyPool value = pool.get();
Assert.assertNotNull(value);
pool.put(value);
// add and remove immediately
for (int i = 0; i < 777; i++) {
pool.put(pool.get());
}
HashSet<MyPool> hashValues = new HashSet<>();
MyPool getValue;
while ((getValue = pool.get()) != null) {
hashValues.add(getValue);
}
Assert.assertEquals(50, hashValues.size());
}
static class MyPool implements SubmitInfo {
public final int i;
MyPool(int i) {
this.i = i;
}
public int getI() {
return i;
}
@Override
public void onError(int errno, String message) {
}
@Override
public void done() {
}
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
MyPool myPool = (MyPool) o;
if (i != myPool.i)
return false;
return true;
}
@Override
public int hashCode() {
return i;
}
}
}

View File

@ -1,768 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jlibaio.test;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.jlibaio.LibaioContext;
import org.apache.activemq.artemis.jlibaio.LibaioFile;
import org.apache.activemq.artemis.jlibaio.SubmitInfo;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
/**
* This test is using a different package from {@link LibaioFile}
* as I need to validate public methods on the API
*/
public class LibaioTest {
@BeforeClass
public static void testAIO() {
Assume.assumeTrue(LibaioContext.isLoaded());
File parent = new File("./target");
File file = new File(parent, "testFile");
try {
parent.mkdirs();
boolean failed = false;
try (LibaioContext control = new LibaioContext<>(1, true, true); LibaioFile fileDescriptor = control.openFile(file, true)) {
fileDescriptor.fallocate(4 * 1024);
} catch (Exception e) {
e.printStackTrace();
failed = true;
}
Assume.assumeFalse("There is not enough support to libaio", failed);
} finally {
file.delete();
}
}
/**
* This is just an arbitrary number for a number of elements you need to pass to the libaio init method
* Some of the tests are using half of this number, so if anyone decide to change this please use an even number.
*/
private static final int LIBAIO_QUEUE_SIZE = 50;
@Rule
public TemporaryFolder temporaryFolder;
public LibaioContext<TestInfo> control;
@Before
public void setUpFactory() {
control = new LibaioContext<>(LIBAIO_QUEUE_SIZE, true, true);
}
@After
public void deleteFactory() {
control.close();
validateLibaio();
}
public void validateLibaio() {
Assert.assertEquals(0, LibaioContext.getTotalMaxIO());
}
public LibaioTest() {
/*
* I didn't use /tmp for three reasons
* - Most systems now will use tmpfs which is not compatible with O_DIRECT
* - This would fill up /tmp in case of failures.
* - target is cleaned up every time you do a mvn clean, so it's safer
*/
File parent = new File("./target");
parent.mkdirs();
temporaryFolder = new TemporaryFolder(parent);
}
@Test
public void testOpen() throws Exception {
LibaioFile fileDescriptor = control.openFile(temporaryFolder.newFile("test.bin"), true);
fileDescriptor.close();
}
@Test
public void testInitAndFallocate10M() throws Exception {
testInit(10 * 1024 * 1024);
}
@Test
public void testInitAndFallocate10M100K() throws Exception {
testInit(10 * 1024 * 1024 + 100 * 1024);
}
private void testInit(int size) throws IOException {
LibaioFile fileDescriptor = control.openFile(temporaryFolder.newFile("test.bin"), true);
fileDescriptor.fallocate(size);
ByteBuffer buffer = fileDescriptor.newBuffer(size);
fileDescriptor.read(0, size, buffer, new TestInfo());
TestInfo[] callbacks = new TestInfo[1];
control.poll(callbacks, 1, 1);
fileDescriptor.close();
buffer.position(0);
LibaioFile fileDescriptor2 = control.openFile(temporaryFolder.newFile("test2.bin"), true);
fileDescriptor2.fill(fileDescriptor.getBlockSize(), size);
fileDescriptor2.read(0, size, buffer, new TestInfo());
control.poll(callbacks, 1, 1);
for (int i = 0; i < size; i++) {
Assert.assertEquals(0, buffer.get());
}
LibaioContext.freeBuffer(buffer);
}
@Test
public void testInitAndFallocate10K() throws Exception {
testInit(10 * 4096);
}
@Test
public void testInitAndFallocate20K() throws Exception {
testInit(20 * 4096);
}
@Test
public void testSubmitWriteOnTwoFiles() throws Exception {
File file1 = temporaryFolder.newFile("test.bin");
File file2 = temporaryFolder.newFile("test2.bin");
fillupFile(file1, LIBAIO_QUEUE_SIZE / 2);
fillupFile(file2, LIBAIO_QUEUE_SIZE / 2);
LibaioFile[] fileDescriptor = new LibaioFile[]{control.openFile(file1, true), control.openFile(file2, true)};
Assert.assertEquals((LIBAIO_QUEUE_SIZE / 2) * 4096, fileDescriptor[0].getSize());
Assert.assertEquals((LIBAIO_QUEUE_SIZE / 2) * 4096, fileDescriptor[1].getSize());
Assert.assertEquals(fileDescriptor[0].getBlockSize(), fileDescriptor[1].getBlockSize());
Assert.assertEquals(LibaioContext.getBlockSize(temporaryFolder.getRoot()), LibaioContext.getBlockSize(file1));
Assert.assertEquals(LibaioContext.getBlockSize(file1), LibaioContext.getBlockSize(file2));
System.out.println("blockSize = " + fileDescriptor[0].getBlockSize());
System.out.println("blockSize /tmp= " + LibaioContext.getBlockSize("/tmp"));
ByteBuffer buffer = LibaioContext.newAlignedBuffer(4096, 4096);
try {
for (int i = 0; i < 4096; i++) {
buffer.put((byte) 'a');
}
TestInfo callback = new TestInfo();
TestInfo[] callbacks = new TestInfo[LIBAIO_QUEUE_SIZE];
for (int i = 0; i < LIBAIO_QUEUE_SIZE / 2; i++) {
for (LibaioFile file : fileDescriptor) {
file.write(i * 4096, 4096, buffer, callback);
}
}
Assert.assertEquals(LIBAIO_QUEUE_SIZE, control.poll(callbacks, LIBAIO_QUEUE_SIZE, LIBAIO_QUEUE_SIZE));
for (Object returnedCallback : callbacks) {
Assert.assertSame(returnedCallback, callback);
}
for (LibaioFile file : fileDescriptor) {
ByteBuffer bigbuffer = LibaioContext.newAlignedBuffer(4096 * 25, 4096);
file.read(0, 4096 * 25, bigbuffer, callback);
Assert.assertEquals(1, control.poll(callbacks, 1, LIBAIO_QUEUE_SIZE));
for (Object returnedCallback : callbacks) {
Assert.assertSame(returnedCallback, callback);
}
for (int i = 0; i < 4096 * 25; i++) {
Assert.assertEquals((byte) 'a', bigbuffer.get());
}
LibaioContext.freeBuffer(bigbuffer);
file.close();
}
} finally {
LibaioContext.freeBuffer(buffer);
}
}
@Test
public void testSubmitWriteAndRead() throws Exception {
TestInfo callback = new TestInfo();
TestInfo[] callbacks = new TestInfo[LIBAIO_QUEUE_SIZE];
LibaioFile fileDescriptor = control.openFile(temporaryFolder.newFile("test.bin"), true);
// ByteBuffer buffer = ByteBuffer.allocateDirect(4096);
ByteBuffer buffer = LibaioContext.newAlignedBuffer(4096, 4096);
try {
for (int i = 0; i < 4096; i++) {
buffer.put((byte) 'a');
}
buffer.rewind();
fileDescriptor.write(0, 4096, buffer, callback);
int retValue = control.poll(callbacks, 1, LIBAIO_QUEUE_SIZE);
Assert.assertEquals(1, retValue);
Assert.assertSame(callback, callbacks[0]);
LibaioContext.freeBuffer(buffer);
buffer = LibaioContext.newAlignedBuffer(4096, 4096);
for (int i = 0; i < 4096; i++) {
buffer.put((byte) 'B');
}
fileDescriptor.write(0, 4096, buffer, null);
Assert.assertEquals(1, control.poll(callbacks, 1, LIBAIO_QUEUE_SIZE));
buffer.rewind();
fileDescriptor.read(0, 4096, buffer, null);
Assert.assertEquals(1, control.poll(callbacks, 1, LIBAIO_QUEUE_SIZE));
for (int i = 0; i < 4096; i++) {
Assert.assertEquals('B', buffer.get());
}
} finally {
LibaioContext.freeBuffer(buffer);
fileDescriptor.close();
}
}
@Test
/**
* This file is making use of libaio without O_DIRECT
* We won't need special buffers on this case.
*/ public void testSubmitWriteAndReadRegularBuffers() throws Exception {
TestInfo callback = new TestInfo();
TestInfo[] callbacks = new TestInfo[LIBAIO_QUEUE_SIZE];
File file = temporaryFolder.newFile("test.bin");
fillupFile(file, LIBAIO_QUEUE_SIZE);
LibaioFile fileDescriptor = control.openFile(file, false);
final int BUFFER_SIZE = 50;
ByteBuffer buffer = ByteBuffer.allocateDirect(BUFFER_SIZE);
try {
for (int i = 0; i < BUFFER_SIZE; i++) {
buffer.put((byte) 'a');
}
buffer.rewind();
fileDescriptor.write(0, BUFFER_SIZE, buffer, callback);
int retValue = control.poll(callbacks, 1, LIBAIO_QUEUE_SIZE);
System.out.println("Return from poll::" + retValue);
Assert.assertEquals(1, retValue);
Assert.assertSame(callback, callbacks[0]);
buffer.rewind();
for (int i = 0; i < BUFFER_SIZE; i++) {
buffer.put((byte) 'B');
}
fileDescriptor.write(0, BUFFER_SIZE, buffer, null);
Assert.assertEquals(1, control.poll(callbacks, 1, LIBAIO_QUEUE_SIZE));
buffer.rewind();
fileDescriptor.read(0, 50, buffer, null);
Assert.assertEquals(1, control.poll(callbacks, 1, LIBAIO_QUEUE_SIZE));
for (int i = 0; i < BUFFER_SIZE; i++) {
Assert.assertEquals('B', buffer.get());
}
} finally {
fileDescriptor.close();
}
}
@Test
public void testSubmitRead() throws Exception {
TestInfo callback = new TestInfo();
TestInfo[] callbacks = new TestInfo[LIBAIO_QUEUE_SIZE];
File file = temporaryFolder.newFile("test.bin");
fillupFile(file, LIBAIO_QUEUE_SIZE);
LibaioFile fileDescriptor = control.openFile(file, true);
ByteBuffer buffer = LibaioContext.newAlignedBuffer(4096, 4096);
final int BUFFER_SIZE = 4096;
try {
for (int i = 0; i < BUFFER_SIZE; i++) {
buffer.put((byte) '@');
}
fileDescriptor.write(0, BUFFER_SIZE, buffer, callback);
Assert.assertEquals(1, control.poll(callbacks, 1, LIBAIO_QUEUE_SIZE));
Assert.assertSame(callback, callbacks[0]);
buffer.rewind();
fileDescriptor.read(0, BUFFER_SIZE, buffer, callback);
Assert.assertEquals(1, control.poll(callbacks, 1, LIBAIO_QUEUE_SIZE));
Assert.assertSame(callback, callbacks[0]);
for (int i = 0; i < BUFFER_SIZE; i++) {
Assert.assertEquals('@', buffer.get());
}
} finally {
LibaioContext.freeBuffer(buffer);
fileDescriptor.close();
}
}
@Test
public void testInvalidWrite() throws Exception {
TestInfo callback = new TestInfo();
TestInfo[] callbacks = new TestInfo[LIBAIO_QUEUE_SIZE];
File file = temporaryFolder.newFile("test.bin");
fillupFile(file, LIBAIO_QUEUE_SIZE);
LibaioFile fileDescriptor = control.openFile(file, true);
try {
ByteBuffer buffer = ByteBuffer.allocateDirect(300);
for (int i = 0; i < 300; i++) {
buffer.put((byte) 'z');
}
fileDescriptor.write(0, 300, buffer, callback);
Assert.assertEquals(1, control.poll(callbacks, 1, LIBAIO_QUEUE_SIZE));
Assert.assertTrue(callbacks[0].isError());
// Error condition
Assert.assertSame(callbacks[0], callback);
System.out.println("Error:" + callbacks[0]);
buffer = fileDescriptor.newBuffer(4096);
for (int i = 0; i < 4096; i++) {
buffer.put((byte) 'z');
}
callback = new TestInfo();
fileDescriptor.write(0, 4096, buffer, callback);
Assert.assertEquals(1, control.poll(callbacks, 1, 1));
Assert.assertSame(callback, callbacks[0]);
fileDescriptor.write(5, 4096, buffer, callback);
Assert.assertEquals(1, control.poll(callbacks, 1, 1));
Assert.assertTrue(callbacks[0].isError());
callbacks = null;
callback = null;
TestInfo.checkLeaks();
} finally {
fileDescriptor.close();
}
}
@Test
public void testLeaks() throws Exception {
File file = temporaryFolder.newFile("test.bin");
fillupFile(file, LIBAIO_QUEUE_SIZE * 2);
TestInfo[] callbacks = new TestInfo[LIBAIO_QUEUE_SIZE];
LibaioFile<TestInfo> fileDescriptor = control.openFile(file, true);
ByteBuffer bufferWrite = LibaioContext.newAlignedBuffer(4096, 4096);
try {
for (int i = 0; i < 4096; i++) {
bufferWrite.put((byte) 'B');
}
for (int j = 0; j < LIBAIO_QUEUE_SIZE * 2; j++) {
for (int i = 0; i < LIBAIO_QUEUE_SIZE; i++) {
TestInfo countClass = new TestInfo();
fileDescriptor.write(i * 4096, 4096, bufferWrite, countClass);
}
Assert.assertEquals(LIBAIO_QUEUE_SIZE, control.poll(callbacks, LIBAIO_QUEUE_SIZE, LIBAIO_QUEUE_SIZE));
for (int i = 0; i < LIBAIO_QUEUE_SIZE; i++) {
Assert.assertNotNull(callbacks[i]);
callbacks[i] = null;
}
}
TestInfo.checkLeaks();
} finally {
LibaioContext.freeBuffer(bufferWrite);
}
}
@Test
public void testLock() throws Exception {
File file = temporaryFolder.newFile("test.bin");
LibaioFile fileDescriptor = control.openFile(file, true);
fileDescriptor.lock();
fileDescriptor.close();
}
@Test
public void testAlloc() throws Exception {
File file = temporaryFolder.newFile("test.bin");
LibaioFile fileDescriptor = control.openFile(file, true);
fileDescriptor.fill(fileDescriptor.getBlockSize(),10 * 1024 * 1024);
fileDescriptor.close();
}
@Test
public void testReleaseNullBuffer() throws Exception {
boolean failed = false;
try {
LibaioContext.freeBuffer(null);
} catch (Exception expected) {
failed = true;
}
Assert.assertTrue("Exception happened!", failed);
}
@Test
public void testMemset() throws Exception {
ByteBuffer buffer = LibaioContext.newAlignedBuffer(4096 * 8, 4096);
for (int i = 0; i < buffer.capacity(); i++) {
buffer.put((byte) 'z');
}
buffer.position(0);
for (int i = 0; i < buffer.capacity(); i++) {
Assert.assertEquals((byte) 'z', buffer.get());
}
control.memsetBuffer(buffer);
buffer.position(0);
for (int i = 0; i < buffer.capacity(); i++) {
Assert.assertEquals((byte) 0, buffer.get());
}
LibaioContext.freeBuffer(buffer);
}
@Test
public void testIOExceptionConditions() throws Exception {
boolean exceptionThrown = false;
control.close();
control = new LibaioContext<>(LIBAIO_QUEUE_SIZE, false, true);
try {
// There is no space for a queue this huge, the native layer should throw the exception
LibaioContext newController = new LibaioContext(Integer.MAX_VALUE, false, true);
} catch (RuntimeException e) {
exceptionThrown = true;
}
Assert.assertTrue(exceptionThrown);
exceptionThrown = false;
try {
// this should throw an exception, we shouldn't be able to open a directory!
control.openFile(temporaryFolder.getRoot(), true);
} catch (IOException expected) {
exceptionThrown = true;
}
Assert.assertTrue(exceptionThrown);
exceptionThrown = false;
LibaioFile fileDescriptor = control.openFile(temporaryFolder.newFile(), true);
fileDescriptor.close();
try {
fileDescriptor.close();
} catch (IOException expected) {
exceptionThrown = true;
}
Assert.assertTrue(exceptionThrown);
fileDescriptor = control.openFile(temporaryFolder.newFile(), true);
ByteBuffer buffer = fileDescriptor.newBuffer(4096);
try {
for (int i = 0; i < 4096; i++) {
buffer.put((byte) 'a');
}
for (int i = 0; i < LIBAIO_QUEUE_SIZE; i++) {
fileDescriptor.write(i * 4096, 4096, buffer, new TestInfo());
}
boolean ex = false;
try {
fileDescriptor.write(0, 4096, buffer, new TestInfo());
} catch (Exception e) {
ex = true;
}
Assert.assertTrue(ex);
TestInfo[] callbacks = new TestInfo[LIBAIO_QUEUE_SIZE];
Assert.assertEquals(LIBAIO_QUEUE_SIZE, control.poll(callbacks, LIBAIO_QUEUE_SIZE, LIBAIO_QUEUE_SIZE));
// it should be possible to write now after queue space being released
fileDescriptor.write(0, 4096, buffer, new TestInfo());
Assert.assertEquals(1, control.poll(callbacks, 1, 100));
TestInfo errorCallback = new TestInfo();
// odd positions will have failures through O_DIRECT
fileDescriptor.read(3, 4096, buffer, errorCallback);
Assert.assertEquals(1, control.poll(callbacks, 1, 50));
Assert.assertTrue(callbacks[0].isError());
Assert.assertSame(errorCallback, (callbacks[0]));
// to help GC and the checkLeaks
callbacks = null;
errorCallback = null;
TestInfo.checkLeaks();
exceptionThrown = false;
try {
LibaioContext.newAlignedBuffer(300, 4096);
} catch (RuntimeException e) {
exceptionThrown = true;
}
Assert.assertTrue(exceptionThrown);
exceptionThrown = false;
try {
LibaioContext.newAlignedBuffer(-4096, 4096);
} catch (RuntimeException e) {
exceptionThrown = true;
}
Assert.assertTrue(exceptionThrown);
} finally {
LibaioContext.freeBuffer(buffer);
}
}
@Test
public void testBlockedCallback() throws Exception {
final LibaioContext blockedContext = new LibaioContext(LIBAIO_QUEUE_SIZE, true, true);
Thread t = new Thread() {
@Override
public void run() {
blockedContext.poll();
}
};
t.start();
int NUMBER_OF_BLOCKS = LIBAIO_QUEUE_SIZE * 10;
final CountDownLatch latch = new CountDownLatch(NUMBER_OF_BLOCKS);
File file = temporaryFolder.newFile("sub-file.txt");
LibaioFile aioFile = blockedContext.openFile(file, true);
aioFile.fill(aioFile.getBlockSize(),NUMBER_OF_BLOCKS * 4096);
final AtomicInteger errors = new AtomicInteger(0);
class MyCallback implements SubmitInfo {
@Override
public void onError(int errno, String message) {
errors.incrementAndGet();
}
@Override
public void done() {
latch.countDown();
}
}
MyCallback callback = new MyCallback();
ByteBuffer buffer = LibaioContext.newAlignedBuffer(4096, 4096);
for (int i = 0; i < 4096; i++) {
buffer.put((byte) 'a');
}
long start = System.currentTimeMillis();
for (int i = 0; i < NUMBER_OF_BLOCKS; i++) {
aioFile.write(i * 4096, 4096, buffer, callback);
}
long end = System.currentTimeMillis();
latch.await();
System.out.println("time = " + (end - start) + " writes/second=" + NUMBER_OF_BLOCKS * 1000L / (end - start));
blockedContext.close();
t.join();
}
private void fillupFile(File file, int blocks) throws IOException {
FileOutputStream fileOutputStream = new FileOutputStream(file);
byte[] bufferWrite = new byte[4096];
for (int i = 0; i < 4096; i++) {
bufferWrite[i] = (byte) 0;
}
for (int i = 0; i < blocks; i++) {
fileOutputStream.write(bufferWrite);
}
fileOutputStream.close();
}
static class TestInfo implements SubmitInfo {
static AtomicInteger count = new AtomicInteger();
@Override
protected void finalize() throws Throwable {
super.finalize();
count.decrementAndGet();
}
public static void checkLeaks() throws InterruptedException {
for (int i = 0; count.get() != 0 && i < 50; i++) {
WeakReference reference = new WeakReference(new Object());
while (reference.get() != null) {
System.gc();
Thread.sleep(100);
}
}
Assert.assertEquals(0, count.get());
}
boolean error = false;
String errorMessage;
int errno;
TestInfo() {
count.incrementAndGet();
}
@Override
public void onError(int errno, String message) {
this.errno = errno;
this.errorMessage = message;
this.error = true;
}
@Override
public void done() {
}
public int getErrno() {
return errno;
}
public void setErrno(int errno) {
this.errno = errno;
}
public boolean isError() {
return error;
}
public void setError(boolean error) {
this.error = error;
}
public String getErrorMessage() {
return errorMessage;
}
public void setErrorMessage(String errorMessage) {
this.errorMessage = errorMessage;
}
}
}

View File

@ -1,179 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jlibaio.test;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.jlibaio.LibaioContext;
import org.apache.activemq.artemis.jlibaio.LibaioFile;
import org.apache.activemq.artemis.jlibaio.SubmitInfo;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
public class OpenCloseContextTest {
@BeforeClass
public static void testAIO() {
Assume.assumeTrue(LibaioContext.isLoaded());
}
@Rule
public TemporaryFolder folder;
public OpenCloseContextTest() {
folder = new TemporaryFolder(new File("./target"));
}
@Test
public void testRepeatOpenCloseContext() throws Exception {
ByteBuffer buffer = LibaioContext.newAlignedBuffer(512, 512);
for (int i = 0; i < 512; i++)
buffer.put((byte) 'x');
for (int i = 0; i < 10; i++) {
System.out.println("#test " + i);
final LibaioContext control = new LibaioContext<>(5, true, true);
Thread t = new Thread() {
@Override
public void run() {
control.poll();
}
};
t.start();
LibaioFile file = control.openFile(folder.newFile(), true);
file.fill(file.getBlockSize(),4 * 1024);
final CountDownLatch insideMethod = new CountDownLatch(1);
final CountDownLatch awaitInside = new CountDownLatch(1);
file.write(0, 512, buffer, new SubmitInfo() {
@Override
public void onError(int errno, String message) {
}
@Override
public void done() {
insideMethod.countDown();
try {
awaitInside.await();
} catch (Throwable e) {
e.printStackTrace();
}
System.out.println("done");
}
});
insideMethod.await();
file.write(512, 512, buffer, new SubmitInfo() {
@Override
public void onError(int errno, String message) {
}
@Override
public void done() {
}
});
awaitInside.countDown();
control.close();
t.join();
}
}
@Test
public void testRepeatOpenCloseContext2() throws Exception {
ByteBuffer buffer = LibaioContext.newAlignedBuffer(512, 512);
for (int i = 0; i < 512; i++)
buffer.put((byte) 'x');
for (int i = 0; i < 10; i++) {
System.out.println("#test " + i);
final LibaioContext control = new LibaioContext<>(5, true, true);
Thread t = new Thread() {
@Override
public void run() {
control.poll();
}
};
t.start();
LibaioFile file = control.openFile(folder.newFile(), true);
file.fill(file.getBlockSize(), 4 * 1024);
final CountDownLatch insideMethod = new CountDownLatch(1);
final CountDownLatch awaitInside = new CountDownLatch(1);
file.write(0, 512, buffer, new SubmitInfo() {
@Override
public void onError(int errno, String message) {
}
@Override
public void done() {
insideMethod.countDown();
try {
awaitInside.await(100, TimeUnit.MILLISECONDS);
} catch (Throwable e) {
e.printStackTrace();
}
System.out.println("done");
}
});
insideMethod.await();
file.write(512, 512, buffer, new SubmitInfo() {
@Override
public void onError(int errno, String message) {
}
@Override
public void done() {
}
});
awaitInside.countDown();
control.close();
t.join();
}
}
@Test
public void testCloseAndStart() throws Exception {
final LibaioContext control2 = new LibaioContext<>(5, true, true);
final LibaioContext control = new LibaioContext<>(5, true, true);
control.close();
control.poll();
control2.close();
control2.poll();
System.out.println("Hello!!");
}
}

View File

@ -79,8 +79,8 @@
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-native</artifactId>
<version>${project.version}</version>
<artifactId>activemq-artemis-native</artifactId>
<version>${activemq-artemis-native-version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>

View File

@ -88,6 +88,7 @@ import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
@ -130,7 +131,7 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.jlibaio.LibaioContext;
import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext;
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule;
@ -608,7 +609,7 @@ public abstract class ActiveMQTestBase extends Assert {
}
public static JournalType getDefaultJournalType() {
if (LibaioContext.isLoaded()) {
if (AIOSequentialFileFactory.isSupported()) {
return JournalType.ASYNCIO;
} else {
return JournalType.NIO;

View File

@ -33,7 +33,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.management.ManagementContext;
import org.apache.activemq.artemis.jlibaio.LibaioContext;
import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext;
import org.apache.activemq.artemis.junit.Wait;
import org.apache.activemq.artemis.spi.core.security.jaas.PropertiesLoader;
import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;

64
pom.xml
View File

@ -48,7 +48,6 @@
<module>artemis-jms-client-all</module>
<module>artemis-jms-client-osgi</module>
<module>artemis-jms-server</module>
<module>artemis-native</module>
<module>artemis-journal</module>
<module>artemis-ra</module>
<module>artemis-rest</module>
@ -75,6 +74,7 @@
<!-- base url for site deployment. See distribution management for full url. Override this in settings.xml for staging -->
<staging.siteURL>scp://people.apache.org/x1/www/activemq.apache.org</staging.siteURL>
<activemq-artemis-native-version>1.0.0</activemq-artemis-native-version>
<karaf.version>4.0.6</karaf.version>
<pax.exam.version>4.9.1</pax.exam.version>
<commons.config.version>2.4</commons.config.version>
@ -168,7 +168,7 @@
<activemq-surefire-argline>-Dbrokerconfig.maxDiskUsage=100 -Djava.util.logging.manager=org.jboss.logmanager.LogManager
-Dlogging.configuration="file:${activemq.basedir}/tests/config/logging.properties"
-Djava.library.path="${activemq.basedir}/artemis-native/bin" -Djgroups.bind_addr=localhost -Dorg.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory.localBindAddress=localhost
-Djava.library.path="${activemq.basedir}/target/bin/lib/linux-x86_64:${activemq.basedir}/target/bin/lib/linux-i686" -Djgroups.bind_addr=localhost -Dorg.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory.localBindAddress=localhost
-Djava.net.preferIPv4Stack=true -Dbasedir=${basedir}
</activemq-surefire-argline>
<activemq.basedir>${project.basedir}</activemq.basedir>
@ -784,6 +784,7 @@
<!-- License: Apache 2.0 -->
</dependency>
</dependencies>
</dependencyManagement>
<profiles>
@ -874,7 +875,6 @@
<module>artemis-server</module>
<module>artemis-jms-client</module>
<module>artemis-jms-server</module>
<module>artemis-native</module>
<module>artemis-protocols</module>
<module>artemis-journal</module>
<module>artemis-jdbc-store</module>
@ -911,7 +911,6 @@
<module>artemis-jms-client</module>
<module>artemis-jms-client-all</module>
<module>artemis-jms-server</module>
<module>artemis-native</module>
<module>artemis-journal</module>
<module>artemis-jdbc-store</module>
<module>artemis-ra</module>
@ -968,7 +967,6 @@
<module>artemis-server</module>
<module>artemis-jms-client</module>
<module>artemis-jms-server</module>
<module>artemis-native</module>
<module>artemis-journal</module>
<module>artemis-jdbc-store</module>
<module>artemis-ra</module>
@ -1009,7 +1007,6 @@
<module>artemis-server</module>
<module>artemis-jms-client</module>
<module>artemis-jms-server</module>
<module>artemis-native</module>
<module>artemis-journal</module>
<module>artemis-jdbc-store</module>
<module>artemis-ra</module>
@ -1041,7 +1038,6 @@
<module>artemis-server</module>
<module>artemis-jms-client</module>
<module>artemis-jms-server</module>
<module>artemis-native</module>
<module>artemis-journal</module>
<module>artemis-jdbc-store</module>
<module>artemis-ra</module>
@ -1072,7 +1068,7 @@
<activemq-surefire-argline>-Djava.util.logging.manager=org.jboss.logmanager.LogManager
-Dlogging.configuration="file:${activemq.basedir}/tests/config/logging.properties"
-Djava.library.path="${activemq.basedir}/artemis-native/bin" -Djgroups.bind_addr=localhost -Dorg.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory.localBindAddress=localhost
-Djava.library.path="${activemq.basedir}/activemq-artemis-native/bin" -Djgroups.bind_addr=localhost -Dorg.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory.localBindAddress=localhost
-Djava.net.preferIPv4Stack=true -Dbasedir=${basedir}
@{jacoco.agent} -Djacoco.agent=@{jacoco.agent}
</activemq-surefire-argline>
@ -1181,7 +1177,7 @@
<fileset dir="${activemq.basedir}/artemis-jms-client/target/classes" />
<fileset dir="${activemq.basedir}/artemis-jms-server/target/classes" />
<fileset dir="${activemq.basedir}/artemis-journal/target/classes" />
<fileset dir="${activemq.basedir}/artemis-native/target/classes" />
<fileset dir="${activemq.basedir}/activemq-artemis-native/target/classes" />
<fileset dir="${activemq.basedir}/artemis-ra/target/classes" />
<fileset dir="${activemq.basedir}/artemis-rest/target/classes" />
<fileset dir="${activemq.basedir}/artemis-selector/target/classes" />
@ -1202,7 +1198,7 @@
<fileset dir="${activemq.basedir}/artemis-jms-client/src/main/java" />
<fileset dir="${activemq.basedir}/artemis-jms-server/src/main/java" />
<fileset dir="${activemq.basedir}/artemis-journal/src/main/java" />
<fileset dir="${activemq.basedir}/artemis-native/src/main/java" />
<fileset dir="${activemq.basedir}/activemq-artemis-native/src/main/java" />
<fileset dir="${activemq.basedir}/artemis-ra/src/main/java" />
<fileset dir="${activemq.basedir}/artemis-rest/src/main/java" />
<fileset dir="${activemq.basedir}/artemis-selector/src/main/java" />
@ -1624,7 +1620,7 @@
<exclude>**/CMakeFiles/</exclude>
<exclude>**/Makefile</exclude>
<exclude>**/cmake_install.cmake</exclude>
<exclude>artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.h</exclude>
<exclude>activemq-artemis-native/src/main/c/org_apache_activemq_artemis_jlibaio_LibaioContext.h</exclude>
<exclude>**/dependency-reduced-pom.xml</exclude>
</excludes>
@ -1662,6 +1658,38 @@
</executions>
</plugin>
<!-- This is placing the .so somewhere other than the distribution so testsuite can take it
This is to avoid a dependency on having to build a distribution in order to run tests.
-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<id>copy</id>
<phase>generate-sources</phase>
<goals>
<goal>unpack</goal>
</goals>
</execution>
</executions>
<configuration>
<artifactItems>
<artifactItem>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-artemis-native</artifactId>
<version>${activemq-artemis-native-version}</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/bin</outputDirectory>
<includes>**/*.so</includes>
</artifactItem>
</artifactItems>
</configuration>
</plugin>
</plugins>
</build>
@ -1739,4 +1767,18 @@
</plugins>
</reporting>
<!--
un comment this session here to validate repository releases.
<repositories>
<repository>
<id>release validation</id>
<name>Maven 1.0.0 release</name>
<layout>default</layout>
<url>https://repository.apache.org/content/repositories/orgapacheactivemq-XXXX</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories> -->
</project>

View File

@ -34,7 +34,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jlibaio.LibaioContext;
import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.Connection;

View File

@ -138,8 +138,8 @@
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-native</artifactId>
<version>${project.version}</version>
<artifactId>activemq-artemis-native</artifactId>
<version>${activemq-artemis-native-version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>

View File

@ -139,8 +139,8 @@
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-native</artifactId>
<version>${project.version}</version>
<artifactId>activemq-artemis-native</artifactId>
<version>${activemq-artemis-native-version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>

View File

@ -16,7 +16,7 @@
*/
package org.apache.activemq.artemis.tests.integration.client;
import org.apache.activemq.artemis.jlibaio.LibaioContext;
import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Test;

View File

@ -21,7 +21,7 @@ import java.io.File;
import org.apache.activemq.artemis.ArtemisConstants;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
import org.apache.activemq.artemis.jlibaio.LibaioContext;
import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext;
import org.apache.activemq.artemis.tests.unit.core.journal.impl.JournalImplTestUnit;
import org.junit.Assert;
import org.junit.Before;

View File

@ -21,7 +21,7 @@ import java.io.File;
import org.apache.activemq.artemis.ArtemisConstants;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
import org.apache.activemq.artemis.jlibaio.LibaioContext;
import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext;
import org.apache.activemq.artemis.tests.unit.core.journal.impl.JournalImplTestUnit;
import org.junit.Assert;
import org.junit.Before;

View File

@ -30,7 +30,7 @@ import org.apache.activemq.artemis.core.journal.LoaderCallback;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.jlibaio.LibaioContext;
import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext;
import org.apache.activemq.artemis.tests.util.SpawnedTestBase;
import org.apache.activemq.artemis.utils.SpawnedVMSupport;
import org.junit.Assert;

View File

@ -71,7 +71,7 @@ import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.jlibaio.LibaioContext;
import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext;
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQSession;

View File

@ -36,7 +36,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.jlibaio.LibaioContext;
import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.qpid.jms.JmsConnectionFactory;

View File

@ -25,7 +25,7 @@ import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.ha.SharedStoreMasterPolicyConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.jlibaio.LibaioContext;
import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;

View File

@ -54,8 +54,8 @@
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-native</artifactId>
<version>${project.version}</version>
<artifactId>activemq-artemis-native</artifactId>
<version>${activemq-artemis-native-version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>

View File

@ -22,7 +22,7 @@ import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.jlibaio.LibaioContext;
import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext;
import org.apache.activemq.artemis.tests.unit.UnitTestLogger;
import org.apache.activemq.artemis.tests.unit.core.journal.impl.JournalImplTestBase;
import org.apache.activemq.artemis.tests.unit.core.journal.impl.fakes.SimpleEncoding;

View File

@ -45,8 +45,8 @@
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-native</artifactId>
<version>${project.version}</version>
<artifactId>activemq-artemis-native</artifactId>
<version>${activemq-artemis-native-version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>

View File

@ -30,7 +30,7 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.jlibaio.LibaioContext;
import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert;
import org.junit.Test;

View File

@ -41,7 +41,7 @@ import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.jlibaio.LibaioContext;
import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;

View File

@ -39,7 +39,7 @@ import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.jlibaio.LibaioContext;
import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert;
import org.junit.Before;

View File

@ -55,8 +55,8 @@
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-native</artifactId>
<version>${project.version}</version>
<artifactId>activemq-artemis-native</artifactId>
<version>${activemq-artemis-native-version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>

View File

@ -20,7 +20,7 @@ import java.util.ArrayList;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.jlibaio.LibaioContext;
import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext;
import org.apache.activemq.artemis.tests.unit.UnitTestLogger;
import org.apache.activemq.artemis.tests.unit.core.journal.impl.JournalImplTestBase;
import org.junit.After;

View File

@ -90,8 +90,8 @@
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-native</artifactId>
<version>${project.version}</version>
<artifactId>activemq-artemis-native</artifactId>
<version>${activemq-artemis-native-version}</version>
</dependency>
<dependency>
<groupId>org.apache.geronimo.specs</groupId>

View File

@ -24,8 +24,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.jlibaio.LibaioContext;
import org.apache.activemq.artemis.jlibaio.LibaioFile;
import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext;
import org.apache.activemq.artemis.nativo.jlibaio.LibaioFile;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.After;
import org.junit.Assert;

View File

@ -27,7 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.aio.AIOSequentialFile;
import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
import org.apache.activemq.artemis.jlibaio.LibaioContext;
import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext;
import org.apache.activemq.artemis.tests.unit.UnitTestLogger;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;

View File

@ -22,7 +22,7 @@ import java.nio.ByteBuffer;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.jlibaio.LibaioContext;
import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext;
import org.apache.activemq.artemis.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert;

View File

@ -26,7 +26,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.jlibaio.LibaioContext;
import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.After;
import org.junit.Assert;