POC/RFC: Introduce [release_queue]

This commit is contained in:
Frédéric Bour 2020-03-24 20:11:39 +01:00
parent e299f31590
commit 454562301a
2 changed files with 116 additions and 65 deletions

View File

@ -227,8 +227,15 @@ let invalidate = function
end
| _ -> assert false
type release_list =
| Release_done
| Release_more :
{ origin : 'a t; element : 'b t; next : release_list } -> release_list
type release_queue = release_list ref
let make_release_queue () = ref Release_done
type release_failure = exn * Printexc.raw_backtrace
exception Release_failure of release_failure list
(* [sub_release [] origin self] is called when [origin] is released,
where [origin] is reachable from [self]'s trace.
@ -418,61 +425,58 @@ let activate_tracing self origin = function
[sub_sample] raise if any user-provided computation raises.
Graph will be left in a coherent state but exception will be propagated
to the observer. *)
let rec sub_sample : type a b . a t -> b t -> b = fun origin ->
function
| Root _ -> assert false
| Pure x | Impure x -> x
| Operator t as self ->
(* try to use cached value, if present *)
match t.value with
| Eval_some value -> value
| _ ->
t.value <- Eval_progress;
let result : b = match t.desc with
| Map (x, f) -> f (sub_sample self x)
| Map2 (x, y, f) -> f (sub_sample self x) (sub_sample self y)
| Pair (x, y) -> (sub_sample self x, sub_sample self y)
| App (f, x) -> (sub_sample self f) (sub_sample self x)
| Join x ->
let old_intermediate = x.intermediate in
let intermediate =
(* We haven't touched any state yet,
it is safe for [sub_sample] to raise *)
sub_sample self x.child
in
x.intermediate <- Some intermediate;
sub_acquire self intermediate;
let result = sub_sample self intermediate in
begin match old_intermediate with
| None -> result
| Some x' ->
(* NOTE: if [intermediate==x'], should we stop there? *)
(* release old value [x'], catching potential exceptions *)
match sub_release [] self x' with
| [] -> result
| failures ->
(* Commit result, just like normal continuation *)
begin match t.value with
| Eval_progress -> t.value <- Eval_some result
| Eval_none | Eval_some _ -> ()
end;
activate_tracing self origin t.trace;
(* Raise release exception *)
raise (Release_failure failures)
end
| Var x -> x.binding
| Prim t -> t.acquire ()
in
begin match t.value with
| Eval_progress -> t.value <- Eval_some result;
| Eval_none | Eval_some _ -> ()
end;
(* [self] just became active, so it may invalidate [origin] in case its
value changes because of [t.desc], like if it's a variable and gets
mutated, or if it's a primitive that gets invalidated.
We need to put [origin] into [self.trace] in case it isn't there yet. *)
activate_tracing self origin t.trace;
result
let sub_sample queue =
let rec aux : type a b . a t -> b t -> b = fun origin ->
function
| Root _ -> assert false
| Pure x | Impure x -> x
| Operator t as self ->
(* try to use cached value, if present *)
match t.value with
| Eval_some value -> value
| _ ->
t.value <- Eval_progress;
let result : b = match t.desc with
| Map (x, f) -> f (aux self x)
| Map2 (x, y, f) -> f (aux self x) (aux self y)
| Pair (x, y) -> (aux self x, aux self y)
| App (f, x) -> (aux self f) (aux self x)
| Join x ->
let intermediate =
(* We haven't touched any state yet,
it is safe for [aux] to raise *)
aux self x.child
in
begin match x.intermediate with
| None ->
x.intermediate <- Some intermediate;
sub_acquire self intermediate;
| Some x' when x' != intermediate ->
queue := Release_more {
origin = self;
element = x';
next = !queue;
};
x.intermediate <- Some intermediate;
sub_acquire self intermediate;
| Some _ -> ()
end;
aux self intermediate
| Var x -> x.binding
| Prim t -> t.acquire ()
in
begin match t.value with
| Eval_progress -> t.value <- Eval_some result;
| Eval_none | Eval_some _ -> ()
end;
(* [self] just became active, so it may invalidate [origin] in case its
value changes because of [t.desc], like if it's a variable and gets
mutated, or if it's a primitive that gets invalidated.
We need to put [origin] into [self.trace] in case it isn't there yet. *)
activate_tracing self origin t.trace;
result
in
aux
type 'a root = 'a t
@ -486,7 +490,42 @@ let observe ?(on_invalidate=ignore) child : _ root =
} in
root
let sample = function
exception Release_failure of exn option * release_failure list
let raw_flush_release_queue queue =
let rec aux failures = function
| Release_done -> failures
| Release_more t ->
let failures = sub_release failures t.origin t.element in
aux failures t.next
in
aux [] queue
let flush_release_queue queue =
let queue' = !queue in
queue := Release_done;
raw_flush_release_queue queue'
let flush_or_fail main_exn queue =
match raw_flush_release_queue queue with
| [] -> ()
| failures -> raise (Release_failure (main_exn, failures))
let start_sub_sample queue self child =
let queue, internal_queue = match queue with
| None -> (ref Release_done, true)
| Some queue -> (queue, false)
in
match sub_sample queue self child with
| result ->
if internal_queue then
flush_or_fail None !queue;
result
| exception exn when internal_queue ->
flush_or_fail (Some exn) !queue;
raise exn
let sample ?release_queue = function
| Pure _ | Impure _ | Operator _ -> assert false
| Root t as self ->
match t.value with
@ -498,7 +537,7 @@ let sample = function
sub_acquire self t.child;
);
t.value <- Eval_progress;
let value = sub_sample self t.child in
let value = start_sub_sample release_queue self t.child in
begin match t.value with
| Eval_progress -> t.value <- Eval_some value; (* cache value *)
| Eval_none | Eval_some _ -> ()
@ -510,16 +549,22 @@ let is_damaged = function
| Root {value = Eval_some _; _} -> false
| Root {value = Eval_none | Eval_progress; _} -> true
let release = function
let release ?release_queue = function
| Pure _ | Impure _ | Operator _ -> assert false
| Root t as self ->
if t.acquired then (
(* release subtree, remove cached value *)
t.value <- Eval_none;
t.acquired <- false;
match sub_release [] self t.child with
| [] -> ()
| failures -> raise (Release_failure failures)
begin match release_queue with
| Some batch ->
batch :=
Release_more { origin = self; element = t.child; next = !batch }
| None ->
match sub_release [] self t.child with
| [] -> ()
| failures -> raise (Release_failure (None, failures))
end
)
let set_on_invalidate x f =

View File

@ -84,8 +84,14 @@ val prim : acquire:(unit -> 'a) -> release:('a -> unit) -> 'a prim
val get_prim : 'a prim -> 'a t
val invalidate : 'a prim -> unit
(** Releasing unused graphs *)
type release_failure = exn * Printexc.raw_backtrace
exception Release_failure of release_failure list
exception Release_failure of exn option * release_failure list
type release_queue
val make_release_queue : unit -> release_queue
val flush_release_queue : release_queue -> release_failure list
type 'a root
(** A root of computation, whose value(s) over time we're interested in. *)
@ -100,7 +106,7 @@ val set_on_invalidate : 'a root -> ('a -> unit) -> unit
(** Change the callback for the root.
@see observe for more details. *)
val sample : 'a root -> 'a
val sample : ?release_queue:release_queue -> 'a root -> 'a
(** Force the computation of the value for this root.
The value is cached, so this is idempotent, until the next invalidation. *)
@ -109,7 +115,7 @@ val is_damaged : 'a root -> bool
cache. This can be the case if the value was never computed, or
if it was computed and then invalidated. *)
val release : 'a root -> unit
val release : ?release_queue:release_queue -> 'a root -> unit
(** Forget about this root and release sub-values no longer reachable from
any root. *)