release-queue #2
169
lib/lwd/lwd.ml
169
lib/lwd/lwd.ml
|
@ -227,8 +227,15 @@ let invalidate = function
|
|||
end
|
||||
| _ -> assert false
|
||||
|
||||
type release_list =
|
||||
| Release_done
|
||||
| Release_more :
|
||||
{ origin : 'a t; element : 'b t; next : release_list } -> release_list
|
||||
|
||||
type release_queue = release_list ref
|
||||
let make_release_queue () = ref Release_done
|
||||
|
||||
type release_failure = exn * Printexc.raw_backtrace
|
||||
exception Release_failure of release_failure list
|
||||
|
||||
(* [sub_release [] origin self] is called when [origin] is released,
|
||||
where [origin] is reachable from [self]'s trace.
|
||||
|
@ -418,61 +425,58 @@ let activate_tracing self origin = function
|
|||
[sub_sample] raise if any user-provided computation raises.
|
||||
Graph will be left in a coherent state but exception will be propagated
|
||||
to the observer. *)
|
||||
let rec sub_sample : type a b . a t -> b t -> b = fun origin ->
|
||||
function
|
||||
| Root _ -> assert false
|
||||
| Pure x | Impure x -> x
|
||||
| Operator t as self ->
|
||||
(* try to use cached value, if present *)
|
||||
match t.value with
|
||||
| Eval_some value -> value
|
||||
| _ ->
|
||||
t.value <- Eval_progress;
|
||||
let result : b = match t.desc with
|
||||
| Map (x, f) -> f (sub_sample self x)
|
||||
| Map2 (x, y, f) -> f (sub_sample self x) (sub_sample self y)
|
||||
| Pair (x, y) -> (sub_sample self x, sub_sample self y)
|
||||
| App (f, x) -> (sub_sample self f) (sub_sample self x)
|
||||
| Join x ->
|
||||
let old_intermediate = x.intermediate in
|
||||
let intermediate =
|
||||
(* We haven't touched any state yet,
|
||||
it is safe for [sub_sample] to raise *)
|
||||
sub_sample self x.child
|
||||
in
|
||||
x.intermediate <- Some intermediate;
|
||||
sub_acquire self intermediate;
|
||||
let result = sub_sample self intermediate in
|
||||
begin match old_intermediate with
|
||||
| None -> result
|
||||
| Some x' ->
|
||||
(* NOTE: if [intermediate==x'], should we stop there? *)
|
||||
(* release old value [x'], catching potential exceptions *)
|
||||
match sub_release [] self x' with
|
||||
| [] -> result
|
||||
| failures ->
|
||||
(* Commit result, just like normal continuation *)
|
||||
begin match t.value with
|
||||
| Eval_progress -> t.value <- Eval_some result
|
||||
| Eval_none | Eval_some _ -> ()
|
||||
end;
|
||||
activate_tracing self origin t.trace;
|
||||
(* Raise release exception *)
|
||||
raise (Release_failure failures)
|
||||
end
|
||||
| Var x -> x.binding
|
||||
| Prim t -> t.acquire ()
|
||||
in
|
||||
begin match t.value with
|
||||
| Eval_progress -> t.value <- Eval_some result;
|
||||
| Eval_none | Eval_some _ -> ()
|
||||
end;
|
||||
(* [self] just became active, so it may invalidate [origin] in case its
|
||||
value changes because of [t.desc], like if it's a variable and gets
|
||||
mutated, or if it's a primitive that gets invalidated.
|
||||
We need to put [origin] into [self.trace] in case it isn't there yet. *)
|
||||
activate_tracing self origin t.trace;
|
||||
result
|
||||
let sub_sample queue =
|
||||
let rec aux : type a b . a t -> b t -> b = fun origin ->
|
||||
function
|
||||
| Root _ -> assert false
|
||||
| Pure x | Impure x -> x
|
||||
| Operator t as self ->
|
||||
(* try to use cached value, if present *)
|
||||
match t.value with
|
||||
| Eval_some value -> value
|
||||
| _ ->
|
||||
t.value <- Eval_progress;
|
||||
let result : b = match t.desc with
|
||||
| Map (x, f) -> f (aux self x)
|
||||
| Map2 (x, y, f) -> f (aux self x) (aux self y)
|
||||
| Pair (x, y) -> (aux self x, aux self y)
|
||||
| App (f, x) -> (aux self f) (aux self x)
|
||||
| Join x ->
|
||||
let intermediate =
|
||||
(* We haven't touched any state yet,
|
||||
it is safe for [aux] to raise *)
|
||||
aux self x.child
|
||||
in
|
||||
begin match x.intermediate with
|
||||
| None ->
|
||||
x.intermediate <- Some intermediate;
|
||||
sub_acquire self intermediate;
|
||||
| Some x' when x' != intermediate ->
|
||||
queue := Release_more {
|
||||
origin = self;
|
||||
element = x';
|
||||
next = !queue;
|
||||
};
|
||||
x.intermediate <- Some intermediate;
|
||||
sub_acquire self intermediate;
|
||||
| Some _ -> ()
|
||||
end;
|
||||
aux self intermediate
|
||||
| Var x -> x.binding
|
||||
| Prim t -> t.acquire ()
|
||||
in
|
||||
begin match t.value with
|
||||
| Eval_progress -> t.value <- Eval_some result;
|
||||
| Eval_none | Eval_some _ -> ()
|
||||
end;
|
||||
(* [self] just became active, so it may invalidate [origin] in case its
|
||||
value changes because of [t.desc], like if it's a variable and gets
|
||||
mutated, or if it's a primitive that gets invalidated.
|
||||
We need to put [origin] into [self.trace] in case it isn't there yet. *)
|
||||
activate_tracing self origin t.trace;
|
||||
result
|
||||
in
|
||||
aux
|
||||
|
||||
type 'a root = 'a t
|
||||
|
||||
|
@ -486,7 +490,42 @@ let observe ?(on_invalidate=ignore) child : _ root =
|
|||
} in
|
||||
root
|
||||
|
||||
let sample = function
|
||||
exception Release_failure of exn option * release_failure list
|
||||
|
||||
let raw_flush_release_queue queue =
|
||||
let rec aux failures = function
|
||||
| Release_done -> failures
|
||||
| Release_more t ->
|
||||
let failures = sub_release failures t.origin t.element in
|
||||
aux failures t.next
|
||||
in
|
||||
aux [] queue
|
||||
|
||||
let flush_release_queue queue =
|
||||
let queue' = !queue in
|
||||
queue := Release_done;
|
||||
raw_flush_release_queue queue'
|
||||
|
||||
let flush_or_fail main_exn queue =
|
||||
match raw_flush_release_queue queue with
|
||||
| [] -> ()
|
||||
| failures -> raise (Release_failure (main_exn, failures))
|
||||
|
||||
let start_sub_sample queue self child =
|
||||
let queue, internal_queue = match queue with
|
||||
| None -> (ref Release_done, true)
|
||||
| Some queue -> (queue, false)
|
||||
in
|
||||
match sub_sample queue self child with
|
||||
| result ->
|
||||
if internal_queue then
|
||||
flush_or_fail None !queue;
|
||||
result
|
||||
| exception exn when internal_queue ->
|
||||
flush_or_fail (Some exn) !queue;
|
||||
raise exn
|
||||
|
||||
let sample ?release_queue = function
|
||||
| Pure _ | Impure _ | Operator _ -> assert false
|
||||
| Root t as self ->
|
||||
match t.value with
|
||||
|
@ -498,7 +537,7 @@ let sample = function
|
|||
sub_acquire self t.child;
|
||||
);
|
||||
t.value <- Eval_progress;
|
||||
let value = sub_sample self t.child in
|
||||
let value = start_sub_sample release_queue self t.child in
|
||||
begin match t.value with
|
||||
| Eval_progress -> t.value <- Eval_some value; (* cache value *)
|
||||
| Eval_none | Eval_some _ -> ()
|
||||
|
@ -510,16 +549,22 @@ let is_damaged = function
|
|||
| Root {value = Eval_some _; _} -> false
|
||||
| Root {value = Eval_none | Eval_progress; _} -> true
|
||||
|
||||
let release = function
|
||||
let release ?release_queue = function
|
||||
| Pure _ | Impure _ | Operator _ -> assert false
|
||||
| Root t as self ->
|
||||
if t.acquired then (
|
||||
(* release subtree, remove cached value *)
|
||||
t.value <- Eval_none;
|
||||
t.acquired <- false;
|
||||
match sub_release [] self t.child with
|
||||
| [] -> ()
|
||||
| failures -> raise (Release_failure failures)
|
||||
begin match release_queue with
|
||||
| Some batch ->
|
||||
batch :=
|
||||
Release_more { origin = self; element = t.child; next = !batch }
|
||||
| None ->
|
||||
match sub_release [] self t.child with
|
||||
| [] -> ()
|
||||
| failures -> raise (Release_failure (None, failures))
|
||||
end
|
||||
)
|
||||
|
||||
let set_on_invalidate x f =
|
||||
|
|
|
@ -84,8 +84,14 @@ val prim : acquire:(unit -> 'a) -> release:('a -> unit) -> 'a prim
|
|||
val get_prim : 'a prim -> 'a t
|
||||
val invalidate : 'a prim -> unit
|
||||
|
||||
(** Releasing unused graphs *)
|
||||
type release_failure = exn * Printexc.raw_backtrace
|
||||
exception Release_failure of release_failure list
|
||||
|
||||
exception Release_failure of exn option * release_failure list
|
||||
|
||||
type release_queue
|
||||
val make_release_queue : unit -> release_queue
|
||||
val flush_release_queue : release_queue -> release_failure list
|
||||
|
||||
type 'a root
|
||||
(** A root of computation, whose value(s) over time we're interested in. *)
|
||||
|
@ -100,7 +106,7 @@ val set_on_invalidate : 'a root -> ('a -> unit) -> unit
|
|||
(** Change the callback for the root.
|
||||
@see observe for more details. *)
|
||||
|
||||
val sample : 'a root -> 'a
|
||||
val sample : ?release_queue:release_queue -> 'a root -> 'a
|
||||
(** Force the computation of the value for this root.
|
||||
The value is cached, so this is idempotent, until the next invalidation. *)
|
||||
|
||||
|
@ -109,7 +115,7 @@ val is_damaged : 'a root -> bool
|
|||
cache. This can be the case if the value was never computed, or
|
||||
if it was computed and then invalidated. *)
|
||||
|
||||
val release : 'a root -> unit
|
||||
val release : ?release_queue:release_queue -> 'a root -> unit
|
||||
(** Forget about this root and release sub-values no longer reachable from
|
||||
any root. *)
|
||||
|
||||
|
|
Loading…
Reference in New Issue