Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
0cc500af204a8ba34580f52596926d60
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
732173f2bdcfd9f1cebb69d1464ec3d6d6729b34268cbe622da86af7e8e09531b1ca7fee14ceeb3246ae03f2874c6716a896cbbf9132b3f660d68bd481d24c05

This file was deleted.

This file was deleted.

4 changes: 2 additions & 2 deletions stdlib/Distributed.version
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
DISTRIBUTED_BRANCH = master
DISTRIBUTED_SHA1 = cd9219573d736b036077dff3cadddf369516d495
DISTRIBUTED_BRANCH = ib/worker_output_customization
DISTRIBUTED_SHA1 = 94777c0ef77990ee7e1a775653c8bb34237ae35a
DISTRIBUTED_GIT_URL := https://github.com/JuliaLang/Distributed.jl
DISTRIBUTED_TAR_URL = https://api.github.com/repos/JuliaLang/Distributed.jl/tarball/$1
2 changes: 1 addition & 1 deletion test/embedding/embedding-test.jl
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,6 @@ end
@test lines[4] == "sqrt(2.0) in C: 1.414214e+00"
@test lines[9] == "called bar"
@test lines[10] == "calling new bar"
@test lines[11] == " From worker 2:\tTaking over the world..."
@test lines[11] == " From worker 2:\tTaking over the world..."
@test "exception caught from C" in readlines(err)
end
60 changes: 58 additions & 2 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,39 @@ include("buildkitetestjson.jl")
const longrunning_delay = parse(Int, get(ENV, "JULIA_TEST_LONGRUNNING_DELAY", "45")) * 60 # minutes
const longrunning_interval = parse(Int, get(ENV, "JULIA_TEST_LONGRUNNING_INTERVAL", "15")) * 60 # minutes

# Helper to run code with prefixed output (uses Pipe + background reader)
function with_output_prefix(f, prefix::String, io::IO, lock::ReentrantLock)
pipe = Pipe()
Base.link_pipe!(pipe; reader_supports_async=true, writer_supports_async=true)

reader_task = @async begin
try
while isopen(pipe) || bytesavailable(pipe) > 0
line = readline(pipe; keep=true)
isempty(line) && break
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this check for?

@lock lock begin
printstyled(io, " ", prefix, ": ", color=:light_black)
print(io, line)
endswith(line, '\n') || println(io)
end
end
catch e
e isa EOFError || rethrow()
end
end

try
redirect_stdout(pipe) do
redirect_stderr(pipe) do
f()
end
end
Comment on lines +39 to +43
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpicking:

Suggested change
redirect_stdout(pipe) do
redirect_stderr(pipe) do
f()
end
end
redirect_stdio(; stdout=pipe, stderr=pipe) do
f()
end

finally
close(pipe.in)
wait(reader_task)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably happens automatically through the finalizer but just to be safe:

Suggested change
wait(reader_task)
wait(reader_task)
close(pipe)

end
end

(; tests, net_on, exit_on_error, use_revise, buildroot, seed) = choosetests(ARGS)
tests = unique(tests)

Expand Down Expand Up @@ -123,6 +156,9 @@ cd(@__DIR__) do
end
skipped = 0

# Track which test is running on each worker (worker_id => test_name)
worker_current_test = Dict{Int, String}()

@everywhere include("testdefs.jl")

if use_revise
Expand Down Expand Up @@ -160,6 +196,21 @@ cd(@__DIR__) do
stderr.lock = print_lock
end

# Set up hook to display test name with worker output
Distributed.worker_output_hook[] = (ident, line) -> begin
wrkr_id = tryparse(Int, ident)
test_name = wrkr_id === nothing ? nothing : get(worker_current_test, wrkr_id, nothing)
@lock print_lock begin
if test_name !== nothing
printstyled(" ", test_name, " (", ident, "): ", color=:light_black)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I think I now see why you wanted the callback to control printing 😅 Could we maybe use StyledStrings for this? Would that handle printing stuff correctly to an IO that may or may not support colors?

else
printstyled(" From worker ", ident, ": ", color=:light_black)
end
println(line)
end
return true
end

function print_testworker_stats(test, wrkr, resp)
@nospecialize resp
lock(print_lock)
Expand Down Expand Up @@ -268,6 +319,7 @@ cd(@__DIR__) do
test = popfirst!(tests)
running_tests[test] = now()
wrkr = p
worker_current_test[wrkr] = test

# Create a timer for this test to report long-running status
test_timers[test] = Timer(longrunning_delay, interval=longrunning_interval) do timer
Expand Down Expand Up @@ -304,6 +356,7 @@ cd(@__DIR__) do
Any[CapturedException(e, catch_backtrace())], time() - before
end
delete!(running_tests, test)
delete!(worker_current_test, wrkr)
if haskey(test_timers, test)
close(test_timers[test])
delete!(test_timers, test)
Expand Down Expand Up @@ -361,8 +414,10 @@ cd(@__DIR__) do
t == "SharedArrays" && (isolate = false)
before = time()
resp, duration = try
r = @invokelatest runtests(t, test_path(t), isolate, seed=seed) # runtests is defined by the include above
r, time() - before
with_output_prefix("$t (1)", stdout, print_lock) do
r = @invokelatest runtests(t, test_path(t), isolate, seed=seed) # runtests is defined by the include above
r, time() - before
end
catch e
isa(e, InterruptException) && rethrow()
Any[CapturedException(e, catch_backtrace())], time() - before
Expand Down Expand Up @@ -395,6 +450,7 @@ cd(@__DIR__) do
if @isdefined test_timers
foreach(close, values(test_timers))
end
Distributed.worker_output_hook[] = nothing
end

#=
Expand Down