# frozen_string_literal: true module TurboTests class Runner def self.run(opts = {}) files = opts[:files] formatters = opts[:formatters] seed = opts[:seed] start_time = opts.fetch(:start_time) { Time.now } verbose = opts.fetch(:verbose, false) fail_fast = opts.fetch(:fail_fast, nil) use_runtime_info = opts.fetch(:use_runtime_info, false) retry_and_log_flaky_tests = opts.fetch(:retry_and_log_flaky_tests, false) STDOUT.puts "VERBOSE" if verbose reporter = Reporter.from_config( formatters, start_time, max_timings_count: opts[:profile_print_slowest_examples_count], ) if ENV["GITHUB_ACTIONS"] RSpec.configure do |config| # Enable color output in GitHub Actions # This eventually will be `config.color_mode = :on` in RSpec 4? config.tty = true config.color = true end end new( reporter: reporter, files: files, verbose: verbose, fail_fast: fail_fast, use_runtime_info: use_runtime_info, seed: seed, profile: opts[:profile], retry_and_log_flaky_tests: retry_and_log_flaky_tests, ).run end def self.default_spec_folders # We do not want to include system specs by default, they are quite slow. Dir .entries("#{Rails.root}/spec") .reject { |entry| !File.directory?("spec/#{entry}") || %w[.. . system].include?(entry) } .map { |entry| "spec/#{entry}" } end def initialize(opts) @reporter = opts[:reporter] @files = opts[:files] @verbose = opts[:verbose] @fail_fast = opts[:fail_fast] @use_runtime_info = opts[:use_runtime_info] @seed = opts[:seed] @profile = opts[:profile] @retry_and_log_flaky_tests = opts[:retry_and_log_flaky_tests] @failure_count = 0 @messages = Queue.new @threads = [] @error = false end def run check_for_migrations @num_processes = ParallelTests.determine_number_of_processes(nil) group_opts = {} group_opts[:runtime_log] = "tmp/turbo_rspec_runtime.log" if @use_runtime_info tests_in_groups = ParallelTests::RSpec::Runner.tests_in_groups(@files, @num_processes, **group_opts) setup_tmp_dir @reporter.add_formatter(Flaky::FailuresLoggerFormatter.new) if @retry_and_log_flaky_tests subprocess_opts = { record_runtime: @use_runtime_info } start_multisite_subprocess(@files, **subprocess_opts) tests_in_groups.each_with_index do |tests, process_id| start_regular_subprocess(tests, process_id + 1, **subprocess_opts) end handle_messages @reporter.finish @threads.each(&:join) if @retry_and_log_flaky_tests && @reporter.failed_examples.present? retry_failed_examples_threshold = 10 if @reporter.failed_examples.length <= retry_failed_examples_threshold STDOUT.puts "Retrying failed examples and logging flaky tests..." return rerun_failed_examples(@reporter.failed_examples) else STDOUT.puts "Retry and log flaky tests was enabled but ignored because there are more than #{retry_failed_examples_threshold} failures." end end @reporter.failed_examples.empty? && !@error end protected def check_for_migrations config = ActiveRecord::Base .configurations .find_db_config("test") .configuration_hash .merge("database" => "discourse_test_1") ActiveRecord::Tasks::DatabaseTasks.migrations_paths = %w[db/migrate db/post_migrate] conn = ActiveRecord::Base.establish_connection(config).connection begin ActiveRecord::Migration.check_pending!(conn) rescue ActiveRecord::PendingMigrationError puts "There are pending migrations, run rake parallel:migrate" exit 1 ensure conn.close end end def setup_tmp_dir begin FileUtils.rm_r("tmp/test-pipes") rescue Errno::ENOENT end FileUtils.mkdir_p("tmp/test-pipes/") end def rerun_failed_examples(failed_examples) command = [ "bundle", "exec", "rspec", "--format", "documentation", "--format", "TurboTests::Flaky::FlakyDetectorFormatter", *Flaky::Manager.potential_flaky_tests, ] system(*command) end def start_multisite_subprocess(tests, **opts) start_subprocess({}, %w[--tag type:multisite], tests, "multisite", **opts) end def start_regular_subprocess(tests, process_id, **opts) start_subprocess( { "TEST_ENV_NUMBER" => process_id.to_s }, %w[--tag ~type:multisite], tests, process_id, **opts, ) end def start_subprocess(env, extra_args, tests, process_id, record_runtime:) if tests.empty? @messages << { type: "exit", process_id: process_id } else tmp_filename = "tmp/test-pipes/subprocess-#{process_id}" begin File.mkfifo(tmp_filename) rescue Errno::EEXIST end env["RSPEC_SILENCE_FILTER_ANNOUNCEMENTS"] = "1" record_runtime_options = if record_runtime %w[--format ParallelTests::RSpec::RuntimeLogger --out tmp/turbo_rspec_runtime.log] else [] end command = [ "bundle", "exec", "rspec", *extra_args, "--order", "random:#{@seed}", "--format", "TurboTests::JsonRowsFormatter", "--out", tmp_filename, *record_runtime_options, *tests, ] env["DISCOURSE_RSPEC_PROFILE_EACH_EXAMPLE"] = "1" if @profile command_string = [env.map { |k, v| "#{k}=#{v}" }.join(" "), command.join(" ")].join(" ") if @verbose STDOUT.puts "::group::[#{process_id}] Run RSpec" if ENV["GITHUB_ACTIONS"] STDOUT.puts "Process #{process_id}: #{command_string}" STDOUT.puts "::endgroup::" if ENV["GITHUB_ACTIONS"] end stdin, stdout, stderr, wait_thr = Open3.popen3(env, *command) stdin.close @threads << Thread.new do File.open(tmp_filename) do |fd| fd.each_line do |line| message = JSON.parse(line) message = message.symbolize_keys message[:process_id] = process_id message[:command_string] = command_string @messages << message end end @messages << { type: "exit", process_id: process_id } end @threads << start_copy_thread(stdout, STDOUT) @threads << start_copy_thread(stderr, STDERR) @threads << Thread.new { @messages << { type: "error" } if wait_thr.value.exitstatus != 0 } end end def start_copy_thread(src, dst) Thread.new do while true begin msg = src.readpartial(4096) rescue EOFError src.close break else dst.write(msg) end end end end def handle_messages exited = 0 begin while true message = @messages.pop case message[:type] when "example_passed" example = FakeExample.from_obj( message[:example], process_id: message[:process_id], command_string: message[:command_string], ) @reporter.example_passed(example) when "example_pending" example = FakeExample.from_obj( message[:example], process_id: message[:process_id], command_string: message[:command_string], ) @reporter.example_pending(example) when "example_failed" example = FakeExample.from_obj( message[:example], process_id: message[:process_id], command_string: message[:command_string], ) @reporter.example_failed(example) @failure_count += 1 if fail_fast_met @threads.each(&:kill) break end when "message" @reporter.message(message[:message]) when "seed" when "close" when "error" @reporter.error_outside_of_examples @error = true when "exit" exited += 1 if @reporter.formatters.any? { |f| f.is_a?(DocumentationFormatter) } @reporter.message("[#{message[:process_id]}] DONE (#{exited}/#{@num_processes + 1})") end break if exited == @num_processes + 1 else STDERR.puts("Unhandled message in main process: #{message}") end STDOUT.flush end rescue Interrupt end end def fail_fast_met !@fail_fast.nil? && @failure_count >= @fail_fast end end end