Skip to content

Commit 95934e5

Browse files
authored
Add $callers tracking to Task (#8510)
This allows us to track which process invoked the Task besides its ancestors in the supervision tree. Closes #7995
1 parent 51187a3 commit 95934e5

File tree

8 files changed

+184
-39
lines changed

8 files changed

+184
-39
lines changed

lib/elixir/lib/task.ex

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -271,9 +271,8 @@ defmodule Task do
271271
@spec start_link(module, atom, [term]) :: {:ok, pid}
272272
def start_link(module, function_name, args)
273273
when is_atom(module) and is_atom(function_name) and is_list(args) do
274-
self()
275-
|> get_owner()
276-
|> Task.Supervised.start_link({module, function_name, args})
274+
mfa = {module, function_name, args}
275+
Task.Supervised.start_link(get_owner(self()), get_callers(self()), mfa)
277276
end
278277

279278
@doc """
@@ -300,9 +299,8 @@ defmodule Task do
300299
@spec start(module, atom, [term]) :: {:ok, pid}
301300
def start(module, function_name, args)
302301
when is_atom(module) and is_atom(function_name) and is_list(args) do
303-
self()
304-
|> get_owner()
305-
|> Task.Supervised.start({module, function_name, args})
302+
mfa = {module, function_name, args}
303+
Task.Supervised.start(get_owner(self()), get_callers(self()), mfa)
306304
end
307305

308306
@doc """
@@ -391,7 +389,7 @@ defmodule Task do
391389
when is_atom(module) and is_atom(function_name) and is_list(args) do
392390
mfa = {module, function_name, args}
393391
owner = self()
394-
{:ok, pid} = Task.Supervised.start_link(get_owner(owner), :nomonitor, mfa)
392+
{:ok, pid} = Task.Supervised.start_link(get_owner(owner), get_callers(owner), :nomonitor, mfa)
395393
ref = Process.monitor(pid)
396394
send(pid, {owner, ref})
397395
%Task{pid: pid, ref: ref, owner: owner}
@@ -498,8 +496,8 @@ defmodule Task do
498496
end
499497

500498
defp build_stream(enumerable, fun, options) do
501-
&Task.Supervised.stream(enumerable, &1, &2, fun, options, fn owner, mfa ->
502-
{:ok, pid} = Task.Supervised.start_link(get_owner(owner), :nomonitor, mfa)
499+
&Task.Supervised.stream(enumerable, &1, &2, fun, options, fn [owner | _] = callers, mfa ->
500+
{:ok, pid} = Task.Supervised.start_link(get_owner(owner), callers, :nomonitor, mfa)
503501
{:ok, :link, pid}
504502
end)
505503
end
@@ -517,6 +515,13 @@ defmodule Task do
517515
{node(), self_or_name, pid}
518516
end
519517

518+
defp get_callers(owner) do
519+
case :erlang.get(:"$callers") do
520+
[_ | _] = list -> [owner | list]
521+
_ -> [owner]
522+
end
523+
end
524+
520525
@doc """
521526
Awaits a task reply and returns it.
522527

lib/elixir/lib/task/supervised.ex

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,21 @@ defmodule Task.Supervised do
22
@moduledoc false
33
@ref_timeout 5000
44

5-
def start(owner, fun) do
6-
{:ok, :proc_lib.spawn(__MODULE__, :noreply, [owner, fun])}
5+
def start(owner, callers, fun) do
6+
{:ok, :proc_lib.spawn(__MODULE__, :noreply, [owner, callers, fun])}
77
end
88

9-
def start_link(owner, fun) do
10-
{:ok, :proc_lib.spawn_link(__MODULE__, :noreply, [owner, fun])}
9+
def start_link(owner, callers, fun) do
10+
{:ok, :proc_lib.spawn_link(__MODULE__, :noreply, [owner, callers, fun])}
1111
end
1212

13-
def start_link(owner, monitor, fun) do
14-
{:ok, :proc_lib.spawn_link(__MODULE__, :reply, [owner, monitor, fun])}
13+
def start_link(owner, callers, monitor, fun) do
14+
{:ok, :proc_lib.spawn_link(__MODULE__, :reply, [owner, callers, monitor, fun])}
1515
end
1616

17-
def reply({_, _, owner_pid} = owner, monitor, mfa) do
17+
def reply({_, _, owner_pid} = owner, callers, monitor, mfa) do
1818
initial_call(mfa)
19+
put_callers(callers)
1920

2021
case monitor do
2122
:monitor ->
@@ -31,7 +32,7 @@ defmodule Task.Supervised do
3132
receive do
3233
{^owner_pid, ref} ->
3334
_ = if mref, do: Process.demonitor(mref, [:flush])
34-
send(owner_pid, {ref, do_apply(owner, mfa)})
35+
send(owner_pid, {ref, invoke_mfa(owner, mfa)})
3536

3637
{:DOWN, ^mref, _, _, reason} when is_reference(mref) ->
3738
exit({:shutdown, reason})
@@ -60,9 +61,14 @@ defmodule Task.Supervised do
6061
end
6162
end
6263

63-
def noreply(owner, mfa) do
64+
def noreply(owner, callers, mfa) do
6465
initial_call(mfa)
65-
do_apply(owner, mfa)
66+
put_callers(callers)
67+
invoke_mfa(owner, mfa)
68+
end
69+
70+
defp put_callers(callers) do
71+
Process.put(:"$callers", callers)
6672
end
6773

6874
defp initial_call(mfa) do
@@ -79,7 +85,7 @@ defmodule Task.Supervised do
7985
{mod, fun, length(args)}
8086
end
8187

82-
defp do_apply(owner, {module, fun, args} = mfa) do
88+
defp invoke_mfa(owner, {module, fun, args} = mfa) do
8389
try do
8490
apply(module, fun, args)
8591
catch
@@ -147,6 +153,7 @@ defmodule Task.Supervised do
147153
timeout = Keyword.get(options, :timeout, 5000)
148154
on_timeout = Keyword.get(options, :on_timeout, :exit)
149155
parent = self()
156+
callers = get_callers()
150157

151158
{:trap_exit, trap_exit?} = Process.info(self(), :trap_exit)
152159

@@ -156,7 +163,10 @@ defmodule Task.Supervised do
156163
spawn_opts = [:link, :monitor]
157164

158165
{monitor_pid, monitor_ref} =
159-
Process.spawn(fn -> stream_monitor(parent, mfa, spawn, trap_exit?, timeout) end, spawn_opts)
166+
Process.spawn(
167+
fn -> stream_monitor(callers, mfa, spawn, trap_exit?, timeout) end,
168+
spawn_opts
169+
)
160170

161171
# Now that we have the pid of the "monitor" process and the reference of the
162172
# monitor we use to monitor such process, we can inform the monitor process
@@ -183,6 +193,13 @@ defmodule Task.Supervised do
183193
)
184194
end
185195

196+
defp get_callers do
197+
case :erlang.get(:"$callers") do
198+
[_ | _] = list -> [self() | list]
199+
_ -> [self()]
200+
end
201+
end
202+
186203
defp stream_reduce({:halt, acc}, _max, _spawned, _delivered, _waiting, next, config) do
187204
%{monitor_pid: monitor_pid, monitor_ref: monitor_ref, timeout: timeout} = config
188205
stream_close(monitor_pid, monitor_ref, timeout)
@@ -426,9 +443,8 @@ defmodule Task.Supervised do
426443
end
427444
end
428445

429-
defp stream_monitor(parent_pid, mfa, spawn, trap_exit?, timeout) do
446+
defp stream_monitor([parent_pid | _] = callers, mfa, spawn, trap_exit?, timeout) do
430447
Process.flag(:trap_exit, trap_exit?)
431-
432448
parent_ref = Process.monitor(parent_pid)
433449

434450
# Let's wait for the parent process to tell this process the monitor ref
@@ -437,7 +453,7 @@ defmodule Task.Supervised do
437453
receive do
438454
{^parent_pid, monitor_ref} ->
439455
config = %{
440-
parent_pid: parent_pid,
456+
callers: callers,
441457
parent_ref: parent_ref,
442458
mfa: mfa,
443459
spawn: spawn,
@@ -454,7 +470,7 @@ defmodule Task.Supervised do
454470

455471
defp stream_monitor_loop(running_tasks, config) do
456472
%{
457-
parent_pid: parent_pid,
473+
callers: [parent_pid | _] = callers,
458474
mfa: mfa,
459475
spawn: spawn,
460476
monitor_ref: monitor_ref,
@@ -465,7 +481,7 @@ defmodule Task.Supervised do
465481
# The parent process is telling us to spawn a new task to process
466482
# "value". We spawn it and notify the parent about its pid.
467483
{:spawn, position, value} ->
468-
case spawn.(parent_pid, normalize_mfa_with_arg(mfa, value)) do
484+
case spawn.(callers, normalize_mfa_with_arg(mfa, value)) do
469485
{:ok, type, pid} ->
470486
ref = Process.monitor(pid)
471487

lib/elixir/lib/task/supervisor.ex

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,7 @@ defmodule Task.Supervisor do
395395
def start_child(supervisor, fun, options \\ []) do
396396
restart = options[:restart]
397397
shutdown = options[:shutdown]
398-
args = [get_owner(self()), {:erlang, :apply, [fun, []]}]
398+
args = [get_owner(self()), get_callers(self()), {:erlang, :apply, [fun, []]}]
399399
start_child_with_spec(supervisor, args, restart, shutdown)
400400
end
401401

@@ -411,7 +411,7 @@ defmodule Task.Supervisor do
411411
when is_atom(fun) and is_list(args) do
412412
restart = options[:restart]
413413
shutdown = options[:shutdown]
414-
args = [get_owner(self()), {module, fun, args}]
414+
args = [get_owner(self()), get_callers(self()), {module, fun, args}]
415415
start_child_with_spec(supervisor, args, restart, shutdown)
416416
end
417417

@@ -432,9 +432,16 @@ defmodule Task.Supervisor do
432432
{node(), self_or_name, pid}
433433
end
434434

435+
defp get_callers(owner) do
436+
case :erlang.get(:"$callers") do
437+
[_ | _] = list -> [owner | list]
438+
_ -> [owner]
439+
end
440+
end
441+
435442
defp async(supervisor, link_type, module, fun, args, options) do
436443
owner = self()
437-
args = [get_owner(owner), :monitor, {module, fun, args}]
444+
args = [get_owner(owner), get_callers(owner), :monitor, {module, fun, args}]
438445
shutdown = options[:shutdown]
439446

440447
case start_child_with_spec(supervisor, args, :temporary, shutdown) do
@@ -456,8 +463,8 @@ defmodule Task.Supervisor do
456463
defp build_stream(supervisor, link_type, enumerable, fun, options) do
457464
shutdown = options[:shutdown]
458465

459-
&Task.Supervised.stream(enumerable, &1, &2, fun, options, fn owner, mfa ->
460-
args = [get_owner(owner), :monitor, mfa]
466+
&Task.Supervised.stream(enumerable, &1, &2, fun, options, fn [owner | _] = callers, mfa ->
467+
args = [get_owner(owner), callers, :monitor, mfa]
461468

462469
case start_child_with_spec(supervisor, args, :temporary, shutdown) do
463470
{:ok, pid} ->

lib/elixir/test/elixir/task/supervisor_test.exs

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,13 +105,30 @@ defmodule Task.SupervisorTest do
105105

106106
test "raises when :max_children is reached" do
107107
{:ok, sup} = Task.Supervisor.start_link(max_children: 1)
108-
109108
Task.Supervisor.async(sup, fn -> Process.sleep(:infinity) end)
110109

111110
assert_raise RuntimeError, ~r/reached the maximum number of tasks/, fn ->
112111
Task.Supervisor.async(sup, fn -> :ok end)
113112
end
114113
end
114+
115+
test "with $callers", config do
116+
sup = config[:supervisor]
117+
grandparent = self()
118+
119+
Task.Supervisor.async(sup, fn ->
120+
parent = self()
121+
assert Process.get(:"$callers") == [grandparent]
122+
assert Process.get(:"$ancestors") == [sup, grandparent]
123+
124+
Task.Supervisor.async(sup, fn ->
125+
assert Process.get(:"$callers") == [parent, grandparent]
126+
assert Process.get(:"$ancestors") == [sup, grandparent]
127+
end)
128+
|> Task.await()
129+
end)
130+
|> Task.await()
131+
end
115132
end
116133

117134
test "async/3", config do
@@ -262,6 +279,25 @@ defmodule Task.SupervisorTest do
262279
assert_receive :ready
263280
end
264281

282+
test "start_child/1 with $callers", config do
283+
sup = config[:supervisor]
284+
grandparent = self()
285+
286+
Task.Supervisor.start_child(sup, fn ->
287+
parent = self()
288+
assert Process.get(:"$callers") == [grandparent]
289+
assert Process.get(:"$ancestors") == [sup, grandparent]
290+
291+
Task.Supervisor.start_child(sup, fn ->
292+
assert Process.get(:"$callers") == [parent, grandparent]
293+
assert Process.get(:"$ancestors") == [sup, grandparent]
294+
send(grandparent, :done)
295+
end)
296+
end)
297+
298+
assert_receive :done
299+
end
300+
265301
test "terminate_child/2", config do
266302
args = [self(), :done]
267303

@@ -369,6 +405,27 @@ defmodule Task.SupervisorTest do
369405
assert MapSet.new(links) == MapSet.new([unused_supervisor, supervisor])
370406
refute_received _
371407
end
408+
409+
test "with $callers", config do
410+
sup = config[:supervisor]
411+
grandparent = self()
412+
413+
Task.Supervisor.async_stream(sup, [1], fn 1 ->
414+
parent = self()
415+
assert Process.get(:"$callers") == [grandparent]
416+
assert Process.get(:"$ancestors") == [sup, grandparent]
417+
418+
Task.Supervisor.async_stream(sup, [1], fn 1 ->
419+
assert Process.get(:"$callers") == [parent, grandparent]
420+
assert Process.get(:"$ancestors") == [sup, grandparent]
421+
send(grandparent, :done)
422+
end)
423+
|> Stream.run()
424+
end)
425+
|> Stream.run()
426+
427+
assert_receive :done
428+
end
372429
end
373430

374431
describe "async_stream_nolink" do

0 commit comments

Comments
 (0)