30多列源碼,無 context 操作如儲存還原堆叠等,無需通過 ffi 底層 c 介面擴展。使用者無需手工 yield resume.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | open Fiber
open Fiber_posix
let rec t1 ()=
begin%m
sleep 1.;
printl "t1";
t1 ();
end
let rec t2 ()=
begin%m
sleep 2.;
printl "t2";
t2 ();
end
let main ()=
t1 () |> async;
t2 () |> async;
sleep 4.
let ()= run @@ main ()
|
主纖程睡眠4秒鐘。纖程 t1 每間隔一秒列印“t1”,而纖程 t2 每間隔兩秒列印“t2”。螢幕顯示為:
kandu@bomb:~/fiber$ ./test.native
t1
t1
t2
t1
t1
t2
看到出現在源碼中的 '%m' 擴展標記了吧,讀者肯定猜到此 fiber 實作是需要使用以前說過的 ok_monad 來對使用者程式碼進行 CPS 轉換的了。使用此方式,程式的邏輯就由字面上的函數的控制流轉變來表示了。實作時避免了需要在函數體中切出換入而引入上下文儲存,並且在函數體的控制流轉變時自動帶入了 break point 可進行任務的按需切換了。我們也無需專門寫一個排程器。所以核心程式碼可以十分精簡。
核心的 Fiber 模組
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | type 'a state=
| Return of 'a
| Sleep
type 'a t= {
mutable state: 'a state;
mutable waiters: (unit -> unit) list;
}
let add_waiter t f= t.waiters <- f::t.waiters
let rec link td ts=
td.state <- ts.state;
if ts.state = Sleep then
add_waiter ts (fun ()-> link td ts)
else List.iter (fun f-> f ()) td.waiters
let sleep ()= {state= Sleep; waiters= []}
let wake t state=
t.state <- state;
List.iter (fun f-> f ()) t.waiters
(* user APIs *)
let return v= {state= Return v; waiters= []}
let rec bind t f=
match t.state with
| Return v-> f v
| Sleep->
let combi= {state= Sleep; waiters= []} in
add_waiter t (fun ()-> link combi (bind t f));
combi
let async (thread:'a t)= ignore thread
|
為了接收 event, 需要一些平臺相關的東西。在不同系統下有各自適合的方式。比如 Linux 的 epoll timerfd_create, BSD 的 KQueue, windows 的 IOCP. 現在我們只是想瞭解下原理,為了在各種系統下都可使用。下面給出 POSIX 系統下的 fiber 補充, Fiber_posix 模組(ocaml 的 cygwin 版在 windows 下可用)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | open Unix
open Fiber
let thl= Hashtbl.create 8
let sleep time=
let id= Timer.new_timer {it_interval= 0.; it_value= time}
and thread= {state= Sleep; waiters= []} in
Hashtbl.replace thl id (fun ()-> wake thread (Return ()));
thread
let printl s=
print_endline s;
{state= (Return ()); waiters= []}
let rec run t=
match t.state with
| Return v-> v
| Sleep ->
begin
let (tms,_)= Timer.select [] [] [] (-1.) in
List.iter
(fun tm-> Hashtbl.find thl tm ())
tms;
run t;
end
|
因為 posix 傳統的 timer 只有通過一個 signal 來傳遞,而我們又需要管理很多計時器。實際的話用 ctypes 庫動態繫結幾個系統呼叫就好了。但是此示例為了做到平臺無關性,所以不採用任何 c 擴展和平臺相關 api, 全都用自帶跨平臺庫。所以,也只能自己動手擴展了。 增加一個 Timer 模組。
timer.ml 主要提供了
new_timer: Unix.interval_timer_status -> int
del_timer: int -> unit
select: Unix.file_descr list -> Unix.file_descr list -> Unix.file_descr list -> float
-> int list * (Unix.file_descr list * Unix.file_descr list * Unix.file_descr list)
擴展的 select 的返回值是原先 Unix.select 返回的可讀,可寫,出錯三個檔案描述符列表組成的 tuple 的前面再包上一個 timer id 列表的 tuple。timer id 裡面即為已經到期的計時器。 上述的 Fiber_posix 模組就用到了它
好了,這樣可以寫出並運行上述的示例程式碼了。
別人都在說行程間通訊(IPC),執行緒間通訊(ITC),我們好歹也得搞個 IFC。 於是增加 Fiber_msgBox 模組
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | open Fiber
type 'a msgBox= {
readers: 'a t Queue.t;
writers: (unit t * 'a) Queue.t;
}
let new_msgBox ()= {readers= Queue.create (); writers= Queue.create ()}
let put msgBox msg=
if not (Queue.is_empty msgBox.readers) then
let reader= Queue.take msgBox.readers in
(wake reader (Return msg);
return ())
else
let res= sleep () in
Queue.add (res, msg) msgBox.writers;
res
let take msgBox=
if not (Queue.is_empty msgBox.writers) then
let (writer, msg)= Queue.take msgBox.writers in
wake writer (Return ());
return msg
else
let res= sleep () in
Queue.add res msgBox.readers;
res
|
有個調皮的男孩偷偷去遠足,出發後不斷向憤怒的媽媽發回自己的訊息。小男孩半路不小心傷了腿,最后休息下後慢慢地回了家。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 | open Fiber
open Fiber_msgBox
open Fiber_posix
let msgBox= new_msgBox ()
let status= ref "ok"
let rec angryMom ()=
let%m status=take msgBox in
begin%m
(match status with
| "ok" -> printl "mom: thank god"
| _ -> printl "mom: fuck god");
angryMom ();
end
let naughtyBoy ()=
let rec keepTouch ()=
begin%m
sleep 1.0;
put msgBox !status;
keepTouch ();
end
and hurt ()=
begin%m
return (status:= "hurt");
printl "boy: ouch";
sleep 2.;
return (status:= "ok");
return 2.;
end
and hike ()=
let time_goHiking= 2. in
begin%m
printl "hiking";
sleep time_goHiking;
let%m time_rest= hurt () in
begin%m
printl "boy: coming back";
let time_goBackWhileLegsGetHurt= 4. in
begin%m
sleep time_goBackWhileLegsGetHurt;
return @@ time_goHiking +. time_rest +. time_goBackWhileLegsGetHurt;
end
end
end
in
keepTouch () |> async;
hike ()
let main ()=
angryMom () |> async;
let%m time= naughtyBoy () in
printl @@ "spend time " ^ string_of_float time
let ()= run @@ main ()
|
如源碼所述。憤怒的媽媽不斷接收到男孩的訊息,並對此做出了反應。
編譯及運行,螢幕顯示如下:
kandu@bomb:~/fiber$ ls
fiber.ml fiber_msgBox.ml fiber_posix.ml test.ml timer.ml
kandu@bomb:~/fiber$ ocamlbuild -use-ocamlfind -syntax camlp4o -pkg unix -pkg monad-custom test.native
Finished, 16 targets (0 cached) in 00:00:01.
kandu@bomb:~/fiber$ ./test.native
hiking
mom: thank god
mom: thank god
boy: ouch
mom: fuck god
mom: fuck god
boy: coming back
mom: thank god
mom: thank god
mom: thank god
mom: thank god
spend time 8.
我們再來增加一個喜歡偷別人信件讀的小偷看結果會怎麼樣。 增加
let rec thief ()=
let%m msg= take msgBox in
begin%m
printl @@ Printf.sprintf "thief: I've stolen a msg `%s'" msg;
thief ();
end
並將 main 函數增加一列,變成
let main ()=
thief () |> async;
angryMom () |> async;
let%m time= naughtyBoy () in
printl @@ "spend time " ^ string_of_float time
編譯及運行,螢幕顯示如下:
kandu@bomb:~/fiber$ ocamlbuild -use-ocamlfind -syntax camlp4o -pkg unix -pkg monad-custom test.native
Finished, 16 targets (12 cached) in 00:00:00.
kandu@bomb:~/fiber$ ./test.native
hiking
thief: I've stolen a msg `ok'
mom: thank god
boy: ouch
thief: I've stolen a msg `hurt'
mom: fuck god
boy: coming back
thief: I've stolen a msg `ok'
mom: thank god
thief: I've stolen a msg `ok'
mom: thank god
spend time 8.
媽媽的反應少了一半,因為有一半的信件被小偷偷走並讀出來了。
核心部分,還可以給纖程增加 join choose 等操作。等待多個纖程完工再幹活,或者只等到最快結束的纖程給的返回值就繼續。 擴展部分, IO呢,檔案讀寫,網路通訊. Fiber_posix.run 的三個檔案描述符列表還是給的空值喲。
既然已經了解了核心部分,剩下的就由你自己來完成吧。
如果語言本身就提供了語法擴展能力,那就方便了,比如像 lisp, ocaml 這樣的。如果是正常的脚本語言的話,都是可行的。因為正常的脚本語言都提供了所需滿足條件的前兩條。而第三條的話,利用 eval, 運行時編譯就可做到了。只要約定好明顯的記號。比如 haskell 的 do, pa_monad_custom 的 perform 加分號。就很容易進行明確的 CPS 變換啦。云风寫過 让 lua 编译时计算,把這種方法用到轉換上就可以。
為什麼源碼沒有註釋。我忐忑不安地覺得,寫的東西勉強可以算的上「源碼即文件」了吧。當然我也不確信,歡迎給出反饋。 ^.^
下載並安裝 ok_monad
編譯
ocamlbuild -use-ocamlfind -pkg unix -pkg ok_monad test.byte
然後執行 test.byte 就可看結果了。