fiber
[tags: CPS OCaml fiber]

纖程可以怎樣實作及使用?

30多列源碼,無 context 操作如儲存還原堆叠等,無需通過 ffi 底層 c 介面擴展。使用者無需手工 yield resume.

程式語言所需滿足的條件

  1. 垃圾回收
  2. 尾遞迴優化
  3. 可進行語法擴展

OCaml 使用示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
open Fiber
open Fiber_posix

let rec t1 ()=
  begin%m
    sleep 1.;
    printl "t1";
    t1 ();
  end

let rec t2 ()=
  begin%m
    sleep 2.;
    printl "t2";
    t2 ();
  end

let main ()=
    t1 () |> async;
    t2 () |> async;
    sleep 4.

let ()= run @@ main ()

主纖程睡眠4秒鐘。纖程 t1 每間隔一秒列印“t1”,而纖程 t2 每間隔兩秒列印“t2”。螢幕顯示為:

kandu@bomb:~/fiber$ ./test.native 
t1
t1
t2
t1
t1
t2

看到出現在源碼中的 '%m' 擴展標記了吧,讀者肯定猜到此 fiber 實作是需要使用以前說過的 ok_monad 來對使用者程式碼進行 CPS 轉換的了。使用此方式,程式的邏輯就由字面上的函數的控制流轉變來表示了。實作時避免了需要在函數體中切出換入而引入上下文儲存,並且在函數體的控制流轉變時自動帶入了 break point 可進行任務的按需切換了。我們也無需專門寫一個排程器。所以核心程式碼可以十分精簡。

核心源碼

核心的 Fiber 模組

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
type 'a state=
  | Return of 'a
  | Sleep

type 'a t= {
  mutable state: 'a state;
  mutable waiters: (unit -> unit) list;
}

let add_waiter t f= t.waiters <- f::t.waiters

let rec link td ts=
  td.state <- ts.state;
  if ts.state = Sleep then
    add_waiter ts (fun ()-> link td ts)
  else List.iter (fun f-> f ()) td.waiters

let sleep ()= {state= Sleep; waiters= []}
let wake t state=
  t.state <- state;
  List.iter (fun f-> f ()) t.waiters

(* user APIs *)

let return v= {state= Return v; waiters= []}

let rec bind t f=
  match t.state with
  | Return v-> f v
  | Sleep->
    let combi= {state= Sleep; waiters= []} in
    add_waiter t (fun ()-> link combi (bind t f));
    combi

let async (thread:'a t)= ignore thread

平臺相關

為了接收 event, 需要一些平臺相關的東西。在不同系統下有各自適合的方式。比如 Linux 的 epoll timerfd_create, BSD 的 KQueue, windows 的 IOCP. 現在我們只是想瞭解下原理,為了在各種系統下都可使用。下面給出 POSIX 系統下的 fiber 補充, Fiber_posix 模組(ocaml 的 cygwin 版在 windows 下可用)。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
open Unix
open Fiber

let thl= Hashtbl.create 8

let sleep time=
  let id= Timer.new_timer {it_interval= 0.; it_value= time}
  and thread= {state= Sleep; waiters= []} in
  Hashtbl.replace thl id (fun ()-> wake thread (Return ()));
  thread

let printl s=
  print_endline s;
  {state= (Return ()); waiters= []}

let rec run t=
  match t.state with
  | Return v-> v
  | Sleep ->
    begin
      let (tms,_)= Timer.select [] [] [] (-1.) in
      List.iter
        (fun tm-> Hashtbl.find thl tm ())
        tms;
      run t;
    end

因為 posix 傳統的 timer 只有通過一個 signal 來傳遞,而我們又需要管理很多計時器。實際的話用 ctypes 庫動態繫結幾個系統呼叫就好了。但是此示例為了做到平臺無關性,所以不採用任何 c 擴展和平臺相關 api, 全都用自帶跨平臺庫。所以,也只能自己動手擴展了。 增加一個 Timer 模組。

timer.ml 主要提供了

new_timer: Unix.interval_timer_status -> int
del_timer: int -> unit
select: Unix.file_descr list -> Unix.file_descr list -> Unix.file_descr list -> float
  -> int list * (Unix.file_descr list * Unix.file_descr list * Unix.file_descr list)

擴展的 select 的返回值是原先 Unix.select 返回的可讀,可寫,出錯三個檔案描述符列表組成的 tuple 的前面再包上一個 timer id 列表的 tuple。timer id 裡面即為已經到期的計時器。 上述的 Fiber_posix 模組就用到了它

完工了

好了,這樣可以寫出並運行上述的示例程式碼了。

其實還沒完

別人都在說行程間通訊(IPC),執行緒間通訊(ITC),我們好歹也得搞個 IFC。 於是增加 Fiber_msgBox 模組

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
open Fiber

type 'a msgBox= {
  readers: 'a t Queue.t;
  writers: (unit t * 'a) Queue.t;
}

let new_msgBox ()= {readers= Queue.create (); writers= Queue.create ()}

let put msgBox msg=
  if not (Queue.is_empty msgBox.readers) then
    let reader= Queue.take msgBox.readers in
    (wake reader (Return msg);
    return ())
  else
    let res= sleep () in
    Queue.add (res, msg) msgBox.writers;
    res

let take msgBox=
  if not (Queue.is_empty msgBox.writers) then
    let (writer, msg)= Queue.take msgBox.writers in
    wake writer (Return ());
    return msg
  else
    let res= sleep () in
    Queue.add res msgBox.readers;
    res

我們來講個故事

有個調皮的男孩偷偷去遠足,出發後不斷向憤怒的媽媽發回自己的訊息。小男孩半路不小心傷了腿,最后休息下後慢慢地回了家。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
open Fiber
open Fiber_msgBox
open Fiber_posix

let msgBox= new_msgBox ()
let status= ref "ok"

let rec angryMom ()=
  let%m status=take msgBox in
  begin%m
    (match status with
    | "ok" -> printl "mom: thank god"
    | _ -> printl "mom: fuck god");
    angryMom ();
  end

let naughtyBoy ()=
  let rec keepTouch ()=
    begin%m
      sleep 1.0;
      put msgBox !status;
      keepTouch ();
    end
  and hurt ()=
    begin%m
      return (status:= "hurt");
      printl "boy: ouch";
      sleep 2.;
      return (status:= "ok");
      return 2.;
    end
  and hike ()=
    let time_goHiking= 2. in
    begin%m
      printl "hiking";
      sleep time_goHiking;
      let%m time_rest= hurt () in
      begin%m
        printl "boy: coming back";
        let time_goBackWhileLegsGetHurt= 4. in
        begin%m
          sleep time_goBackWhileLegsGetHurt;
          return @@ time_goHiking +. time_rest +. time_goBackWhileLegsGetHurt;
        end
      end
    end
  in 
  keepTouch () |> async;
  hike ()


let main ()=
  angryMom () |> async;
  let%m time= naughtyBoy () in
  printl @@ "spend time " ^ string_of_float time

let ()= run @@ main ()

如源碼所述。憤怒的媽媽不斷接收到男孩的訊息,並對此做出了反應。

編譯及運行,螢幕顯示如下:

kandu@bomb:~/fiber$ ls
fiber.ml  fiber_msgBox.ml  fiber_posix.ml  test.ml  timer.ml
kandu@bomb:~/fiber$ ocamlbuild -use-ocamlfind -syntax camlp4o -pkg unix -pkg monad-custom test.native
Finished, 16 targets (0 cached) in 00:00:01.
kandu@bomb:~/fiber$ ./test.native 
hiking
mom: thank god
mom: thank god
boy: ouch
mom: fuck god
mom: fuck god
boy: coming back
mom: thank god
mom: thank god
mom: thank god
mom: thank god
spend time 8.

我們再來增加一個喜歡偷別人信件讀的小偷看結果會怎麼樣。 增加

let rec thief ()=
  let%m msg= take msgBox in
  begin%m
    printl @@ Printf.sprintf "thief: I've stolen a msg `%s'" msg;
    thief ();
  end

並將 main 函數增加一列,變成

let main ()=
  thief () |> async;
  angryMom () |> async;
  let%m time= naughtyBoy () in
  printl @@ "spend time " ^ string_of_float time

編譯及運行,螢幕顯示如下:

kandu@bomb:~/fiber$ ocamlbuild -use-ocamlfind -syntax camlp4o -pkg unix -pkg monad-custom test.native
Finished, 16 targets (12 cached) in 00:00:00.
kandu@bomb:~/fiber$ ./test.native 
hiking
thief: I've stolen a msg `ok'
mom: thank god
boy: ouch
thief: I've stolen a msg `hurt'
mom: fuck god
boy: coming back
thief: I've stolen a msg `ok'
mom: thank god
thief: I've stolen a msg `ok'
mom: thank god
spend time 8.

媽媽的反應少了一半,因為有一半的信件被小偷偷走並讀出來了。

真的還沒完

核心部分,還可以給纖程增加 join choose 等操作。等待多個纖程完工再幹活,或者只等到最快結束的纖程給的返回值就繼續。 擴展部分, IO呢,檔案讀寫,網路通訊. Fiber_posix.run 的三個檔案描述符列表還是給的空值喲。

既然已經了解了核心部分,剩下的就由你自己來完成吧。

語法擴展呢

如果語言本身就提供了語法擴展能力,那就方便了,比如像 lisp, ocaml 這樣的。如果是正常的脚本語言的話,都是可行的。因為正常的脚本語言都提供了所需滿足條件的前兩條。而第三條的話,利用 eval, 運行時編譯就可做到了。只要約定好明顯的記號。比如 haskell 的 do, pa_monad_custom 的 perform 加分號。就很容易進行明確的 CPS 變換啦。云风寫過 让 lua 编译时计算,把這種方法用到轉換上就可以。

註釋

為什麼源碼沒有註釋。我忐忑不安地覺得,寫的東西勉強可以算的上「源碼即文件」了吧。當然我也不確信,歡迎給出反饋。 ^.^

依賴,運行

下載並安裝 ok_monad

編譯

ocamlbuild -use-ocamlfind -pkg unix -pkg ok_monad test.byte

然後執行 test.byte 就可看結果了。