Seq: new implementation
This commit is contained in:
parent
7be5636017
commit
07d92b205b
|
@ -101,56 +101,77 @@ module Reducer = struct
|
|||
type stats = {
|
||||
mutable marked: int;
|
||||
mutable shared: int;
|
||||
mutable blocked: int;
|
||||
}
|
||||
let mk_stats () = { marked = 0; shared = 0 }
|
||||
let mk_stats () = { marked = 0; shared = 0; blocked = 0 }
|
||||
|
||||
let rec discard stats mask = function
|
||||
let new_marked stats = stats.marked <- stats.marked + 1
|
||||
let new_shared stats = stats.shared <- stats.shared + 1
|
||||
let new_blocked stats = stats.blocked <- stats.blocked + 1
|
||||
|
||||
let rec block stats = function
|
||||
| Nil -> ()
|
||||
| Leaf t ->
|
||||
let mark = t.mark in
|
||||
if mark land both_mask <> 0 && mark land mask = 0 then (
|
||||
stats.marked <- stats.marked + 1;
|
||||
stats.shared <- stats.shared + 1;
|
||||
t.mark <- mark lor mask;
|
||||
| Leaf t' ->
|
||||
let mark = t'.mark in
|
||||
if mark land both_mask <> both_mask then (
|
||||
new_blocked stats;
|
||||
t'.mark <- mark lor both_mask
|
||||
)
|
||||
| Join t ->
|
||||
let mark = t.mark in
|
||||
if mark land both_mask <> 0 && mark land mask = 0 then (
|
||||
stats.marked <- stats.marked + 1;
|
||||
stats.shared <- stats.shared + 1;
|
||||
t.mark <- mark lor mask;
|
||||
discard stats mask t.l;
|
||||
discard stats mask t.r;
|
||||
| Join t' ->
|
||||
let mark = t'.mark in
|
||||
if mark land both_mask <> both_mask then (
|
||||
new_blocked stats;
|
||||
t'.mark <- mark lor both_mask;
|
||||
block stats t'.l;
|
||||
block stats t'.r;
|
||||
)
|
||||
|
||||
let enqueue stats q mask = function
|
||||
| Nil -> ()
|
||||
| Leaf t ->
|
||||
let mark = t.mark in
|
||||
stats.marked <- stats.marked + 1;
|
||||
| Leaf t' ->
|
||||
let mark = t'.mark in
|
||||
if mark land mask = 0 then (
|
||||
t.mark <- mark lor mask;
|
||||
if mark land both_mask <> 0 then
|
||||
stats.shared <- stats.shared + 1;
|
||||
(* Not yet seen *)
|
||||
new_marked stats;
|
||||
if mark land both_mask <> 0 then (
|
||||
(* Newly shared, clear mask *)
|
||||
t'.mark <- -1;
|
||||
new_blocked stats;
|
||||
new_shared stats;
|
||||
) else
|
||||
t'.mark <- mark lor mask;
|
||||
);
|
||||
if mark <> -1 && mark land both_mask = both_mask then (
|
||||
t'.mark <- -1;
|
||||
new_shared stats
|
||||
)
|
||||
| Join t as node ->
|
||||
let mark = t.mark in
|
||||
| Join t' as t ->
|
||||
let mark = t'.mark in
|
||||
if mark land mask = 0 then (
|
||||
stats.marked <- stats.marked + 1;
|
||||
t.mark <- mark lor mask;
|
||||
if mark land both_mask = 0 then (
|
||||
Queue.push node q
|
||||
(* Not yet seen *)
|
||||
new_marked stats;
|
||||
if mark land both_mask <> 0 then (
|
||||
(* Newly shared, clear mask *)
|
||||
t'.mark <- -1;
|
||||
new_blocked stats;
|
||||
new_shared stats;
|
||||
block stats t'.l;
|
||||
block stats t'.r;
|
||||
) else (
|
||||
stats.shared <- stats.shared + 1;
|
||||
discard stats mask t.l;
|
||||
discard stats mask t.r;
|
||||
(* First mark *)
|
||||
t'.mark <- mark lor mask;
|
||||
Queue.push t q
|
||||
)
|
||||
);
|
||||
if mark <> -1 && mark land both_mask = both_mask then (
|
||||
t'.mark <- -1;
|
||||
new_shared stats
|
||||
)
|
||||
|
||||
let dequeue stats q mask =
|
||||
match Queue.pop q with
|
||||
| Join t ->
|
||||
if t.mark land mask <> 0 then (
|
||||
if t.mark land both_mask = mask then (
|
||||
enqueue stats q mask t.l;
|
||||
enqueue stats q mask t.r;
|
||||
)
|
||||
|
@ -176,99 +197,143 @@ module Reducer = struct
|
|||
dropped : 'b option array;
|
||||
mutable dropped_leaf : int;
|
||||
mutable dropped_join : int;
|
||||
shared : ('a, 'b) xform array;
|
||||
shared : 'a seq array;
|
||||
shared_x : ('a, 'b) xform list array;
|
||||
mutable shared_index: int;
|
||||
}
|
||||
|
||||
let next_shared_index st =
|
||||
let result = st.shared_index in
|
||||
st.shared_index <- result + 1;
|
||||
result
|
||||
|
||||
let rec unblock = function
|
||||
| XEmpty -> ()
|
||||
| XLeaf {a = Nil | Join _; _} -> assert false
|
||||
| XJoin {a = Nil | Leaf _; _} -> assert false
|
||||
| XLeaf {a = Leaf t'; _} ->
|
||||
let mark = t'.mark in
|
||||
if mark <> -1 && mark land both_mask = both_mask then
|
||||
t'.mark <- mark land lnot both_mask;
|
||||
| XJoin {a = Join t'; l; r; _} ->
|
||||
let mark = t'.mark in
|
||||
if mark <> -1 && mark land both_mask = both_mask then (
|
||||
t'.mark <- mark land lnot both_mask;
|
||||
unblock l;
|
||||
unblock r
|
||||
)
|
||||
|
||||
let rec unmark_old st = function
|
||||
| XEmpty -> ()
|
||||
| XLeaf {a = Nil | Join _; _} -> assert false
|
||||
| XJoin {a = Nil | Leaf _; _} -> assert false
|
||||
| XLeaf {a = Leaf t'; b} as t ->
|
||||
let mark = t'.mark land both_mask in
|
||||
if mark = old_mask then (
|
||||
| XLeaf {a = Leaf t' as a; b} as t ->
|
||||
let mark = t'.mark in
|
||||
if mark land both_mask = old_mask then (
|
||||
let dropped_leaf = st.dropped_leaf in
|
||||
if dropped_leaf > -1 then (
|
||||
st.dropped.(dropped_leaf) <- b;
|
||||
st.dropped_leaf <- dropped_leaf + 1;
|
||||
);
|
||||
t'.mark <- mark land lnot both_mask
|
||||
) else if mark = both_mask then (
|
||||
let shared_index = st.shared_index in
|
||||
st.shared.(shared_index) <- t;
|
||||
st.shared_index <- shared_index + 1;
|
||||
t'.mark <- shared_index lsl mask_bits;
|
||||
) else if mark = -1 then (
|
||||
let index = next_shared_index st in
|
||||
st.shared.(index) <- a;
|
||||
st.shared_x.(index) <- [t];
|
||||
t'.mark <- (index lsl mask_bits) lor new_mask;
|
||||
) else if mark land both_mask = new_mask then (
|
||||
let index = mark lsr mask_bits in
|
||||
st.shared_x.(index) <- t :: st.shared_x.(index);
|
||||
) else if mark land both_mask = both_mask then (
|
||||
assert false
|
||||
(*t'.mark <- mark land lnot both_mask*)
|
||||
)
|
||||
| XJoin {a = Join t'; l; r; b} as t ->
|
||||
let mark = t'.mark land both_mask in
|
||||
if mark <> 0 then (
|
||||
if mark = old_mask then (
|
||||
let dropped_join = st.dropped_join - 1 in
|
||||
if dropped_join > -1 then (
|
||||
st.dropped.(dropped_join) <- b;
|
||||
st.dropped_join <- dropped_join;
|
||||
);
|
||||
t'.mark <- mark land lnot both_mask
|
||||
)
|
||||
else if mark = both_mask then (
|
||||
let shared_index = st.shared_index in
|
||||
st.shared.(shared_index) <- t;
|
||||
st.shared_index <- shared_index + 1;
|
||||
t'.mark <- shared_index lsl mask_bits;
|
||||
| XJoin {a = Join t' as a; l; r; b} as t ->
|
||||
let mark = t'.mark in
|
||||
if mark land both_mask = old_mask then (
|
||||
let dropped_leaf = st.dropped_leaf in
|
||||
if dropped_leaf > -1 then (
|
||||
st.dropped.(dropped_leaf) <- b;
|
||||
st.dropped_leaf <- dropped_leaf + 1;
|
||||
);
|
||||
t'.mark <- mark land lnot both_mask;
|
||||
unmark_old st l;
|
||||
unmark_old st r;
|
||||
) else if mark = -1 then (
|
||||
let index = next_shared_index st in
|
||||
st.shared.(index) <- a;
|
||||
st.shared_x.(index) <- [t];
|
||||
t'.mark <- (index lsl mask_bits) lor new_mask;
|
||||
unblock l;
|
||||
unblock r;
|
||||
) else if mark land both_mask = new_mask then (
|
||||
let index = mark lsr mask_bits in
|
||||
st.shared_x.(index) <- t :: st.shared_x.(index);
|
||||
) else if mark land both_mask = both_mask then (
|
||||
assert false
|
||||
)
|
||||
|
||||
let prepare_shared st =
|
||||
for i = 0 to st.shared_index - 1 do
|
||||
match st.shared_x.(i) with
|
||||
| [] -> assert false
|
||||
| [_] -> ()
|
||||
| xs -> st.shared_x.(i) <- List.rev xs
|
||||
done
|
||||
|
||||
let rec unmark_new st = function
|
||||
| Nil -> XEmpty
|
||||
| Leaf t' as t ->
|
||||
let mark = t'.mark in
|
||||
if mark land both_mask = new_mask then (
|
||||
let shared_index = st.shared_index in
|
||||
let x = XLeaf {a = t; b = None} in
|
||||
st.shared.(shared_index) <- x;
|
||||
st.shared_index <- shared_index + 1;
|
||||
t'.mark <- shared_index lsl mask_bits;
|
||||
x
|
||||
if mark <> -1 && mark land both_mask = both_mask then (
|
||||
let index = mark lsr mask_bits in
|
||||
match st.shared_x.(index) with
|
||||
| [] -> XLeaf {a = t; b = None}
|
||||
| x :: xs -> st.shared_x.(index) <- xs; x
|
||||
) else (
|
||||
assert (mark land both_mask = 0);
|
||||
st.shared.(mark lsr mask_bits)
|
||||
t'.mark <- 0;
|
||||
XLeaf {a = t; b = None}
|
||||
)
|
||||
| Join t' as t ->
|
||||
let mark = t'.mark in
|
||||
if mark land both_mask = new_mask then (
|
||||
let shared_index = st.shared_index in
|
||||
st.shared_index <- shared_index + 1;
|
||||
if mark = -1 then (
|
||||
let index = next_shared_index st in
|
||||
t'.mark <- 0;
|
||||
st.shared.(index) <- t;
|
||||
let l = unmark_new st t'.l in
|
||||
let r = unmark_new st t'.r in
|
||||
let x = XJoin {a = t; b = None; l; r} in
|
||||
st.shared.(shared_index) <- x;
|
||||
t'.mark <- shared_index lsl mask_bits;
|
||||
x
|
||||
XJoin {a = t; b = None; l; r}
|
||||
) else if mark land both_mask = both_mask then (
|
||||
let index = mark lsr mask_bits in
|
||||
match st.shared_x.(index) with
|
||||
| [] -> assert false
|
||||
| x :: xs ->
|
||||
st.shared_x.(index) <- xs;
|
||||
if xs == [] then t'.mark <- 0;
|
||||
x
|
||||
) else (
|
||||
assert (mark land both_mask = 0);
|
||||
st.shared.(mark lsr mask_bits)
|
||||
t'.mark <- t'.mark land lnot both_mask;
|
||||
let l = unmark_new st t'.l in
|
||||
let r = unmark_new st t'.r in
|
||||
XJoin {a = t; b = None; l; r}
|
||||
)
|
||||
|
||||
let rec check_ranks = function
|
||||
| Nil -> 0
|
||||
| Leaf t -> assert (t.mark = 0); 0
|
||||
| Join t ->
|
||||
let l = check_ranks t.l in
|
||||
let r = check_ranks t.r in
|
||||
let rank = max l r + 1 in
|
||||
assert (t.mark = rank lsl mask_bits);
|
||||
rank
|
||||
type 'b dropped = {
|
||||
leaves: int;
|
||||
table: 'b option array;
|
||||
extra_leaf: 'b list;
|
||||
extra_join: 'b list;
|
||||
}
|
||||
|
||||
let no_dropped =
|
||||
{ leaves = 0; table = [||]; extra_leaf = []; extra_join = [] }
|
||||
|
||||
let diff get_dropped xold tnew = match xold, tnew with
|
||||
| XEmpty, Nil -> 0, [||], XEmpty
|
||||
| (XLeaf {a; _} | XJoin {a; _}), _ when a == tnew -> 0, [||], xold
|
||||
| XEmpty, Nil -> no_dropped, XEmpty
|
||||
| (XLeaf {a; _} | XJoin {a; _}), _ when a == tnew -> no_dropped, xold
|
||||
| _ ->
|
||||
let qold = Queue.create () in
|
||||
let sold = mk_stats () in
|
||||
let qnew = Queue.create () in
|
||||
let snew = mk_stats () in
|
||||
let qold = Queue.create () and sold = mk_stats () in
|
||||
let qnew = Queue.create () and snew = mk_stats () in
|
||||
begin match xold with
|
||||
| XEmpty -> ()
|
||||
| (XLeaf {a; _} | XJoin {a; _}) ->
|
||||
|
@ -276,28 +341,44 @@ module Reducer = struct
|
|||
end;
|
||||
enqueue snew qnew new_mask tnew;
|
||||
traverse sold snew qold qnew;
|
||||
let nb_dropped = sold.marked - (sold.shared + snew.shared) in
|
||||
let nb_dropped = sold.marked - (sold.blocked + snew.blocked) in
|
||||
let st = {
|
||||
dropped = if get_dropped then Array.make nb_dropped None else [||];
|
||||
dropped_leaf = if get_dropped then 0 else -1;
|
||||
dropped_join = if get_dropped then nb_dropped else -1;
|
||||
shared = Array.make snew.marked XEmpty;
|
||||
shared = Array.make (sold.shared + snew.shared) Nil;
|
||||
shared_x = Array.make (sold.shared + snew.shared) [];
|
||||
shared_index = 0;
|
||||
} in
|
||||
unmark_old st xold;
|
||||
assert (st.dropped_leaf = st.dropped_join);
|
||||
prepare_shared st;
|
||||
let result = unmark_new st tnew in
|
||||
let restore_rank = function
|
||||
| Nil -> assert false
|
||||
| Leaf t -> t.mark <- 0
|
||||
| Join t ->
|
||||
t.mark <- (max (rank t.l) (rank t.r) + 1) lsl mask_bits
|
||||
in
|
||||
for i = st.shared_index - 1 downto 0 do
|
||||
match st.shared.(i) with
|
||||
| XEmpty
|
||||
| XLeaf {a = Nil | Join _; _}
|
||||
| XJoin {a = Nil | Leaf _; _} -> assert false
|
||||
| XLeaf {a = Leaf t; _} -> t.mark <- 0
|
||||
| XJoin {a = Join t; _} ->
|
||||
t.mark <- (max (rank t.l) (rank t.r) + 1) lsl mask_bits
|
||||
restore_rank st.shared.(i)
|
||||
done;
|
||||
ignore (check_ranks tnew);
|
||||
st.dropped_leaf, st.dropped, result
|
||||
if get_dropped then (
|
||||
let xleaf = ref [] in
|
||||
let xjoin = ref [] in
|
||||
for i = 0 to st.shared_index - 1 do
|
||||
List.iter (function
|
||||
| XLeaf { b = Some b; _} -> xleaf := b :: !xleaf
|
||||
| XJoin { b = Some b; _} -> xjoin := b :: !xjoin
|
||||
| _ -> ()
|
||||
) st.shared_x.(i)
|
||||
done;
|
||||
({ leaves = st.dropped_leaf;
|
||||
table = st.dropped;
|
||||
extra_leaf = !xleaf;
|
||||
extra_join = !xjoin }, result)
|
||||
) else
|
||||
no_dropped, result
|
||||
|
||||
type ('a, 'b) map_reduce = ('a -> 'b) * ('b -> 'b -> 'b)
|
||||
let map (f, _) x = f x
|
||||
|
@ -329,18 +410,13 @@ module Reducer = struct
|
|||
eval map_reduce tree
|
||||
|
||||
let update (map_reduce, old_tree : _ reducer) new_tree : _ reducer =
|
||||
let _leaves, _dropped, tree = diff false old_tree new_tree in
|
||||
let _, tree = diff false old_tree new_tree in
|
||||
(map_reduce, tree)
|
||||
|
||||
type 'b dropped = {
|
||||
leaves: int;
|
||||
table: 'b option array;
|
||||
}
|
||||
|
||||
let update_and_get_dropped (map_reduce, old_tree : _ reducer) new_tree
|
||||
: _ dropped * _ reducer =
|
||||
let leaves, table, tree = diff true old_tree new_tree in
|
||||
{ leaves; table }, (map_reduce, tree)
|
||||
let dropped, tree = diff true old_tree new_tree in
|
||||
(dropped, (map_reduce, tree))
|
||||
|
||||
let fold_dropped kind f dropped acc =
|
||||
let acc = ref acc in
|
||||
|
|
Loading…
Reference in New Issue