@@ -8,10 +8,11 @@ defmodule File.Stream do
88 * `modes` - the file modes
99 * `raw` - a boolean indicating if bin functions should be used
1010 * `line_or_bytes` - if reading should read lines or a given number of bytes
11+ * `node` - the node the file belongs to
1112
1213 """
1314
14- defstruct path: nil , modes: [ ] , line_or_bytes: :line , raw: true
15+ defstruct path: nil , modes: [ ] , line_or_bytes: :line , raw: true , node: nil
1516
1617 @ type t :: % __MODULE__ { }
1718
@@ -32,19 +33,29 @@ defmodule File.Stream do
3233 modes
3334 end
3435
35- % File.Stream { path: path , modes: modes , raw: raw , line_or_bytes: line_or_bytes }
36+ % File.Stream { path: path , modes: modes , raw: raw , line_or_bytes: line_or_bytes , node: node ( ) }
37+ end
38+
39+ @ doc false
40+ def __open__ ( % File.Stream { path: path , node: node } , modes ) when node == node ( ) do
41+ :file . open ( path , modes )
42+ end
43+
44+ @ doc false
45+ def __open__ ( % File.Stream { path: path , node: node } , modes ) do
46+ :erpc . call ( node , :file_io_server , :start , [ self ( ) , path , List . delete ( modes , :raw ) ] )
3647 end
3748
3849 defimpl Collectable do
39- def into ( % { path: path , modes: modes , raw: raw } = stream ) do
50+ def into ( % { modes: modes , raw: raw } = stream ) do
4051 modes = for mode <- modes , mode not in [ :read ] , do: mode
4152
42- case :file . open ( path , [ :write | modes ] ) do
53+ case File.Stream . __open__ ( stream , [ :write | modes ] ) do
4354 { :ok , device } ->
4455 { :ok , into ( device , stream , raw ) }
4556
4657 { :error , reason } ->
47- raise File.Error , reason: reason , action: "stream" , path: path
58+ raise File.Error , reason: reason , action: "stream" , path: stream . path
4859 end
4960 end
5061
@@ -73,14 +84,14 @@ defmodule File.Stream do
7384 defimpl Enumerable do
7485 @ read_ahead_size 64 * 1024
7586
76- def reduce ( % { path: path , modes: modes , line_or_bytes: line_or_bytes , raw: raw } , acc , fun ) do
87+ def reduce ( % { modes: modes , line_or_bytes: line_or_bytes , raw: raw } = stream , acc , fun ) do
7788 start_fun = fn ->
78- case :file . open ( path , read_modes ( modes ) ) do
89+ case File.Stream . __open__ ( stream , read_modes ( modes ) ) do
7990 { :ok , device } ->
8091 if :trim_bom in modes , do: trim_bom ( device , raw ) |> elem ( 0 ) , else: device
8192
8293 { :error , reason } ->
83- raise File.Error , reason: reason , action: "stream" , path: path
94+ raise File.Error , reason: reason , action: "stream" , path: stream . path
8495 end
8596 end
8697
@@ -93,27 +104,20 @@ defmodule File.Stream do
93104 Stream . resource ( start_fun , next_fun , & :file . close / 1 ) . ( acc , fun )
94105 end
95106
96- def count ( % { path: path , modes: modes , line_or_bytes: :line } = stream ) do
107+ def count ( % { modes: modes , line_or_bytes: :line , path: path } = stream ) do
97108 pattern = :binary . compile_pattern ( "\n " )
98109 counter = & count_lines ( & 1 , path , pattern , read_function ( stream ) , 0 )
99-
100- case File . open ( path , read_modes ( modes ) , counter ) do
101- { :ok , count } ->
102- { :ok , count }
103-
104- { :error , reason } ->
105- raise File.Error , reason: reason , action: "stream" , path: path
106- end
110+ { :ok , open! ( stream , modes , counter ) }
107111 end
108112
109- def count ( % { path: path , line_or_bytes: bytes , raw: true , modes: modes } ) do
110- case File . stat ( path ) do
113+ def count ( % { path: path , line_or_bytes: bytes , raw: true , modes: modes , node: node } = stream ) do
114+ case :erpc . call ( node , File , :stat , [ path ] ) do
111115 { :ok , % { size: 0 } } ->
112116 { :error , __MODULE__ }
113117
114118 { :ok , % { size: size } } ->
115119 remainder = if rem ( size , bytes ) == 0 , do: 0 , else: 1
116- { :ok , div ( size , bytes ) + remainder - count_raw_bom ( path , modes ) }
120+ { :ok , div ( size , bytes ) + remainder - count_raw_bom ( stream , modes ) }
117121
118122 { :error , reason } ->
119123 raise File.Error , reason: reason , action: "stream" , path: path
@@ -132,9 +136,23 @@ defmodule File.Stream do
132136 { :error , __MODULE__ }
133137 end
134138
135- defp count_raw_bom ( path , modes ) do
139+ defp open! ( stream , modes , fun ) do
140+ case File.Stream . __open__ ( stream , read_modes ( modes ) ) do
141+ { :ok , device } ->
142+ try do
143+ fun . ( device )
144+ after
145+ :file . close ( device )
146+ end
147+
148+ { :error , reason } ->
149+ raise File.Error , reason: reason , action: "stream" , path: stream . path
150+ end
151+ end
152+
153+ defp count_raw_bom ( stream , modes ) do
136154 if :trim_bom in modes do
137- File . open! ( path , read_modes ( modes ) , & ( & 1 |> trim_bom ( true ) |> elem ( 1 ) ) )
155+ open! ( stream , read_modes ( modes ) , & ( & 1 |> trim_bom ( true ) |> elem ( 1 ) ) )
138156 else
139157 0
140158 end
0 commit comments