diff --git a/src/mria_autoheal.erl b/src/mria_autoheal.erl index b7c826d..04f9a88 100644 --- a/src/mria_autoheal.erl +++ b/src/mria_autoheal.erl @@ -25,12 +25,14 @@ -record(autoheal, {delay, role, proc, timer}). -type autoheal() :: #autoheal{}. +-type cluster_view() :: {node(), [node()], [node()]}. -export_type([autoheal/0]). -include_lib("snabbkaffe/include/trace.hrl"). -define(DEFAULT_DELAY, 15000). +-define(CLUSTER_RPC_TIMEOUT, 5000). -define(LOG(Level, Format, Args), logger:Level("Mria(Autoheal): " ++ Format, Args)). @@ -77,24 +79,12 @@ handle_msg({report_partition, Node}, Autoheal = #autoheal{delay = Delay, timer = handle_msg(Msg = {create_splitview, Node}, Autoheal = #autoheal{delay = Delay, timer = TRef}) when Node =:= node() -> ensure_cancel_timer(TRef), - case is_majority_alive() of + Nodes = mria_mnesia:db_nodes(), + ClusterViews = collect_cluster_views(Nodes), + HasMajority = length(ClusterViews) > length(Nodes) div 2, + case HasMajority of true -> - Nodes = mria_mnesia:db_nodes(), - RPCResult = erpc:multicall(Nodes, mria_mnesia, running_nodes, []), - SplitView = lists:foldl(fun({N, Result}, Acc) -> - case Result of - {ok, Peers} -> - Acc #{N => Peers}; - _ -> - %% Ignore unreachable nodes: - Acc - end - end, - #{}, - lists:zip(Nodes, RPCResult)), - Cliques = lists:sort(fun compare_cliques/2, - mria_lib:find_clusters(SplitView)), - mria_node_monitor:cast(coordinator(Cliques), {heal_partition, Cliques}), + apply_heal_plan(ClusterViews), Autoheal#autoheal{timer = undefined}; false -> Autoheal#autoheal{timer = mria_node_monitor:run_after(Delay, {autoheal, Msg})} @@ -118,38 +108,120 @@ handle_msg({heal_partition, Cliques}, Autoheal= #autoheal{proc = _Proc}) -> handle_msg({'EXIT', Pid, normal}, Autoheal = #autoheal{proc = Pid}) -> Autoheal#autoheal{proc = undefined}; -handle_msg({'EXIT', Pid, Reason}, Autoheal = #autoheal{proc = Pid}) -> - ?LOG(critical, "Autoheal process crashed: ~s", [Reason]), +handle_msg({'EXIT', Pid, Reason}, Autoheal = #autoheal{delay = Delay, proc = Pid}) -> + ?LOG(critical, "Autoheal process crashed: ~p", [Reason]), + mria_node_monitor:run_after(Delay, confirm_partition), Autoheal#autoheal{proc = undefined}; handle_msg(Msg, Autoheal) -> ?LOG(critical, "Unexpected msg: ~p", [Msg, Autoheal]), Autoheal. -compare_cliques(Running1, Running2) -> - Len1 = length(Running1), Len2 = length(Running2), - if - Len1 > Len2 -> true; - Len1 == Len2 -> lists:member(node(), Running1); - true -> false +-spec collect_cluster_views([node()]) -> [cluster_view()]. +collect_cluster_views(Nodes) -> + RPCResult = erpc:multicall(Nodes, mria_mnesia, cluster_view, [], ?CLUSTER_RPC_TIMEOUT), + [ {Node, Running, Stopped} + %% Ignore unreachable nodes: + || {Node, {ok, {Running, Stopped}}} <- lists:zip(Nodes, RPCResult)]. + +-spec apply_heal_plan([cluster_view()]) -> ok. +apply_heal_plan(ClusterViews) -> + case find_split_view(ClusterViews) of + SplitView = [Survivors | Rest] -> + Victims = lists:usort(lists:append(Rest)), + Coordinator = coordinator(Survivors), + ?tp(info, mria_autoheal_plan, #{ survivors => Survivors + , victims => Victims + , split_view => SplitView + , coordinator => Coordinator + }), + case Victims of + [_ | _] -> + mria_node_monitor:cast(Coordinator, + {heal_partition, [Survivors, Victims]}); + [] -> + ok + end; + [] -> + ok + end. + +%% Purpose of this function is to find the largest set of nodes to survive the +%% partition heal. As these nodes will seed all restarting nodes, they should +%% contain consistent set of Mria data, i.e. they should have replicated the +%% same set of transactions. +%% +%% These survivor nodes are chosen according to reachability matrix: +%% 1. Each node starts with a bit vector containing only itself. +%% 2. For every reported running node `RN' by node `N', RN's reachability +%% vector is updated. This means each final vector represents the set of +%% nodes that reported the corresponding node as running (reachable). +%% 3. The largest set of nodes that agrees on their reachability vectors is +%% chosen as survivors. All other sets of nodes are considered victims. +%% +%% If there are several equally large such sets, the one that compares lower is +%% preferred, according to Erlang term order. +%% +%% Set of survivors nodes is returned in the head of resulting list, while tail +%% contains sets of victim nodes, potentially separated into disagreeing +%% partitions. +-spec find_split_view([{node(), _Running :: [node()], _Partitioned :: [node()]}]) -> + [_Survivors :: [node()] | _Victims :: [[node()]]]. +find_split_view(ClusterViews = [_ | _]) -> + Cluster = lists:sort([N || {N, _, _} <- ClusterViews]), + Vectors0 = maps:from_list(lists:zipwith( + fun(N, Idx) -> {N, 1 bsl Idx} end, + Cluster, + lists:seq(0, length(Cluster) - 1) + )), + Vectors = lists:foldl( + fun({N, Running, _Stopped}, Vectors1) -> + Flag = maps:get(N, Vectors0), + lists:foldl( + fun(RN, Vectors) -> + case maps:is_key(RN, Vectors) of + true -> maps:update_with(RN, fun(V) -> V bor Flag end, Vectors); + false -> Vectors + end + end, + Vectors1, + Running) + end, + Vectors0, + ClusterViews), + Components = maps:values( + maps:groups_from_list( fun({_, V}) -> V end + , fun({N, _}) -> N end + , maps:to_list(Vectors))), + lists:sort( fun compare_components/2 + , [lists:sort(C) || C <- Components]); +find_split_view([]) -> + []. + +%% Compares connected components by size of set of universals. +%% Orders component with larger set of universals before smaller. +compare_components(C0, C1) -> + case length(C0) - length(C1) of + 0 -> C0 =< C1; + N -> N > 0 end. --spec coordinator([[node()]]) -> node(). -coordinator([Majority | _]) -> - mria_membership:coordinator(Majority). +-spec coordinator([node()]) -> node(). +coordinator(Survivors) -> + mria_membership:coordinator(Survivors). -spec heal_partition([[node()]]) -> ok. heal_partition([[_Majority]]) -> %% There are no partitions: ok; heal_partition([Majority|Minorities]) -> - Result = reboot_minority(lists:append(Minorities)), + Result = reboot_partitioned(lists:append(Minorities)), mria_lib:exec_callback(heal_partition, {Majority, Minorities}), Result. -reboot_minority(Minority) -> - ?tp(info, "Rebooting minority", #{nodes => Minority}), - lists:foreach(fun rejoin/1, Minority). +reboot_partitioned(Nodes) -> + ?tp(info, "Rebooting partitions", #{nodes => Nodes}), + lists:foreach(fun rejoin/1, Nodes). rejoin(Node) -> Ret = rpc:call(Node, mria, join, [node(), heal]), @@ -163,7 +235,169 @@ ensure_cancel_timer(undefined) -> ensure_cancel_timer(TRef) -> catch erlang:cancel_timer(TRef). -is_majority_alive() -> - All = mria_mnesia:cluster_nodes(all), - NotAliveLen = length(All -- [node() | nodes()]), - NotAliveLen < (length(All) div 2). +%%================================================================================ +%% Unit tests +%%================================================================================ + +-ifdef(TEST). + +-include_lib("proper/include/proper_common.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("snabbkaffe/include/test_macros.hrl"). + +split_view_empty_test_() -> + ?_assertMatch([], find_split_view([])). + +split_view_no_partition_test_() -> + ?_assertMatch([[1, 2, 3]], + find_split_view([ {1, [1, 2, 3], []} + , {2, [1, 2, 3], []} + , {3, [1, 2, 3], []} + ])). + +split_view_symmetric_partition_test_() -> + [ ?_assertMatch([[2, 3], [1]], + find_split_view([ {1, [1], [2, 3]} + , {2, [2, 3], [1]} + , {3, [2, 3], [1]} + ])) + , ?_assertMatch([[1, 2], [3, 4]], + find_split_view([ {1, [1, 2], [3, 4]} + , {2, [1, 2], [3, 4]} + , {3, [3, 4], [1, 2]} + , {4, [3, 4], [1, 2]} + ])) + , ?_assertMatch([[1, 2, 3], [4, 5], [6]], + find_split_view([ {1, [1, 2, 3], [4, 5, 6]} + , {2, [1, 2, 3], [4, 5, 6]} + , {3, [1, 2, 3], [4, 5, 6]} + , {4, [4, 5], [1, 2, 3, 6]} + , {5, [4, 5], [1, 2, 3, 6]} + , {6, [4, 5, 6], [1, 2, 3]} + ])) + ]. + +split_view_full_split_test_() -> + ?_assertMatch([[1], [2], [3], [4]], + find_split_view([ {1, [1], [2, 3, 4]} + , {2, [2], [1, 3, 4]} + , {3, [3], [1, 2, 4]} + , {4, [4], [1, 2, 3]} + ])). + +split_view_overlapping_partition_test_() -> + [ ?_assertMatch([[1], [2], [3], [4]], + find_split_view([ {1, [1, 4], [2, 3]} + , {2, [2, 3], [1, 4]} + , {3, [2, 3, 4], [1]} + , {4, [1, 3, 4], [2]}])) + , ?_assertMatch([[1], [2], [3], [4]], + find_split_view([ {1, [4, 1, 2], [3]} + , {2, [1, 2, 3], [4]} + , {3, [2, 3, 4], [1]} + , {4, [3, 4, 1], [2]}])) + , ?_assertMatch([[1, 2, 3], [4], [5]], + find_split_view([ {1, [1, 2, 3, 4, 5], []} + , {2, [1, 2, 3, 4, 5], []} + , {3, [1, 2, 3, 4, 5], []} + , {4, [1, 2, 3, 4], [5]} + , {5, [1, 2, 3, 5], [4]}])) + , ?_assertMatch([[1, 2], [3, 4], [5], [6]], + find_split_view([ {1, [1, 2], [3, 4, 5]} + , {2, [1, 2], [3, 4, 5]} + , {3, [3, 4, 5, 6], [1, 2]} + , {4, [3, 4, 5, 6], [1, 2]} + , {5, [3, 4, 5], [1, 2, 6]} + , {6, [3, 4, 6], [1, 2, 5]}])) + + , ?_assertMatch([[1], [2], [3], [4], [5]], + find_split_view([ {1, [1, 2, 3, 4, 5], []} + , {2, [1, 2, 3, 4], [5]} + , {3, [1, 2, 3, 5], [4]} + , {4, [1, 2, 4, 5], [3]} + , {5, [1, 3, 4, 5], [2]}])) + ]. + +split_view_asymm_partition_test_() -> + ?_assertMatch([[1, 2], [3], [4]], + find_split_view([ {1, [1, 2, 4], [3]} + , {2, [1, 2, 4], [3]} + , {3, [3, 4], [1, 2]} + , {4, [1, 2, 4], [3]} + ])). + +split_view_single_component_overlapping_test_() -> + [ ?_assertMatch([[1, 2, 3], [6, 7], [4], [5]], + find_split_view([ {1, [1, 2, 3, 4, 5], [6, 7]} + , {2, [1, 2, 3, 4, 5], [6, 7]} + , {3, [1, 2, 3, 4, 5], [6, 7]} + , {4, [1, 2, 3, 4, 6, 7], [5]} + , {5, [1, 2, 3, 5, 6, 7], [4]} + , {6, [4, 5, 6, 7], [1, 2, 3]} + , {7, [4, 5, 6, 7], [1, 2, 3]}])) + , ?_assertMatch([[2, 3], [4, 5], [1], [6]], + find_split_view([ {1, [1, 6, 2, 3], [4, 5]} + , {2, [2, 1, 3], [4, 5, 6]} + , {3, [3, 1, 2], [4, 5, 6]} + , {4, [4, 5, 6], [1, 2, 3]} + , {5, [5, 4, 6], [1, 2, 3]} + , {6, [6, 1, 4, 5], [2, 3]} + ])) + ]. + +prop_split_view_complete_test_() -> + Config = [{proper, #{numtests => 100, max_size => 300, timeout => 15000}}], + {timeout, 20, ?_test(?run_prop(Config, + ?FORALL(ClusterViews, t_cluster_views(), + case find_split_view(ClusterViews) of + [] -> true; + [Survivors | Rest] -> + ClusterNodes = lists:sort([Node || {Node, _, _} <- ClusterViews]), + Victims = lists:append(Rest), + {conjunction, [ + {survivors_victims_disjoint, + proper:equals(Survivors, Survivors -- Victims)}, + {no_missed_nodes, + proper:equals(lists:sort(Survivors), ClusterNodes -- Victims)} + ]} + end)))}. + +prop_split_view_nonempty_survivors_test_() -> + Config = [{proper, #{numtests => 100, max_size => 300, timeout => 15000}}], + {timeout, 20, ?_test(?run_prop(Config, + ?FORALL(ClusterViews, t_nonempty_cluster_views(), + case find_split_view(ClusterViews) of + [] -> false; + [Survivors | _] -> Survivors =/= [] + end)))}. + +t_nonempty_cluster_views() -> + ?SUCHTHAT(X, t_cluster_views(), X =/= []). + +t_cluster_views() -> + ?LET(NNodes, ?SIZED(S, S), + ?LET(NPartitions, proper_types:oneof([0, 0, 0, 0, 1, 2, 3, 4]), + ?LET(LBoundaries, [proper_types:range(1, NNodes) || _ <- lists:seq(1, NPartitions)], + ?LET(NBrokenLinks, proper_types:non_neg_integer(), + ?LET(LBrokenLinks, [ {proper_types:range(1, NNodes), proper_types:range(1, NNodes)} + || _ <- lists:seq(1, NBrokenLinks)], + begin + Cluster = lists:seq(1, NNodes), + Boundaries = lists:usort([1, NNodes + 1 | LBoundaries]), + Partitions = lists:zipwith( fun(N1, N2) -> lists:seq(N1, N2 - 1) end + , Boundaries + , tl(Boundaries) + , trim), + BrokenLinks = sets:from_list(LBrokenLinks, [{version, 2}]), + IsBrokenLink = fun + (N1, N1) -> false; + (N1, N2) -> sets:is_element({N1, N2}, BrokenLinks) orelse + sets:is_element({N2, N1}, BrokenLinks) + end, + lists:append([ [ {Node, [N || N <- Nodes, not IsBrokenLink(N, Node)], + [N || N <- Nodes, IsBrokenLink(N, Node)] ++ (Cluster -- Nodes)} + || Node <- Nodes] + || Nodes <- Partitions]) + end))))). + +-endif. diff --git a/test/mria_autoheal_SUITE.erl b/test/mria_autoheal_SUITE.erl index f0221d6..cb73fd8 100644 --- a/test/mria_autoheal_SUITE.erl +++ b/test/mria_autoheal_SUITE.erl @@ -88,6 +88,84 @@ t_autoheal_with_replicants(Config) when is_list(Config) -> end, [fun ?MODULE:prop_callbacks/1]). +t_autoheal_overlapping_parition(Config) when is_list(Config) -> + Cluster = mria_ct:cluster([core, core, core, core], + [{mria, cluster_autoheal, 200}], + [{beam_args, "-kernel prevent_overlapping_partitions false"}]), + ?check_trace( + #{timetrap => 25000}, + try + Nodes = [N1, N2, N3, N4] = mria_ct:start_cluster(mria, Cluster), + %% Simulate netsplit: + true = rpc:cast(N4, erlang, disconnect_node, [N3]), + ok = timer:sleep(1000), + %% Nodes report overlapping partitions: + ?assertMatch({[N1, N2, N3, N4], []}, view(N1)), + ?assertMatch({[N1, N2, N3, N4], []}, view(N2)), + ?assertMatch({[N1, N2, N3], [N4]}, view(N3)), + ?assertMatch({[N1, N2, N4], [N3]}, view(N4)), + %% Wait for autoheal, it should happen automatically: + ?retry(1000, 20, + begin + ?assertMatch({Nodes, []}, view(N1)), + ?assertMatch({Nodes, []}, view(N2)), + ?assertMatch({Nodes, []}, view(N3)), + ?assertMatch({Nodes, []}, view(N4)) + end), + Nodes + after + ok = mria_ct:teardown_cluster(Cluster) + end, + [ fun ?MODULE:prop_callbacks/1 + , fun([N1, N2, N3, N4], Trace) -> + %% Both N3 and N4 are potentially inconsistent and should be restarted: + ?assertMatch( [#{survivors := [N1, N2], victims := [N3, N4]}] + , ?of_kind(mria_autoheal_plan, Trace)), + ?assertMatch( [#{nodes := [N3, N4]}] + , ?of_kind("Rebooting partitions", Trace)) + end + ]). + +t_autoheal_complex_overlapping_paritions(Config) when is_list(Config) -> + Cluster = mria_ct:cluster([core, core, core, core], + [{mria, cluster_autoheal, 200}], + [{beam_args, "-kernel prevent_overlapping_partitions false"}]), + ?check_trace( + #{timetrap => 25000}, + try + Nodes = [N1, N2, N3, N4] = mria_ct:start_cluster(mria, Cluster), + %% Simulate netsplit: + true = rpc:cast(N1, erlang, disconnect_node, [N2]), + true = rpc:cast(N1, erlang, disconnect_node, [N3]), + true = rpc:cast(N2, erlang, disconnect_node, [N4]), + ok = timer:sleep(1000), + %% Nodes report overlapping partitions: + ?assertMatch({[N1, N4], [N2, N3]}, view(N1)), + ?assertMatch({[N2, N3], [N1, N4]}, view(N2)), + ?assertMatch({[N2, N3, N4], [N1]}, view(N3)), + ?assertMatch({[N1, N3, N4], [N2]}, view(N4)), + %% Wait for autoheal, it should happen automatically: + ?retry(1000, 20, + begin + ?assertMatch({Nodes, []}, view(N1)), + ?assertMatch({Nodes, []}, view(N2)), + ?assertMatch({Nodes, []}, view(N3)), + ?assertMatch({Nodes, []}, view(N4)) + end), + Nodes + after + ok = mria_ct:teardown_cluster(Cluster) + end, + [ fun ?MODULE:prop_callbacks/1 + , fun([N1, N2, N3, N4], Trace) -> + %% All but one node are potentially inconsistent and should be restarted: + ?assertMatch( [#{survivors := [N1], victims := [N2, N3, N4]}] + , ?of_kind(mria_autoheal_plan, Trace)), + ?assertMatch( [#{nodes := [N2, N3, N4]}] + , ?of_kind("Rebooting partitions", Trace)) + end + ]). + t_autoheal_majority_reachable(Config) when is_list(Config) -> Cluster = mria_ct:cluster([core, core, core, core, core], [{mria, cluster_autoheal, 200}]), ?check_trace( @@ -200,7 +278,7 @@ assert_replicant_bootstrapped(R, C, Trace) -> %% Verify that mria callbacks have been executed during heal prop_callbacks(Trace0) -> {Trace, _} = ?split_trace_at(#{?snk_kind := teardown_cluster}, Trace0), - {_, [HealEvent|AfterHeal]} = ?split_trace_at(#{?snk_kind := "Rebooting minority"}, Trace), + {_, [HealEvent|AfterHeal]} = ?split_trace_at(#{?snk_kind := "Rebooting partitions"}, Trace), #{nodes := Minority} = HealEvent, %% Check that all minority nodes have been restarted: [?assert( diff --git a/test/mria_ct.erl b/test/mria_ct.erl index 0ff3e85..4feaaca 100644 --- a/test/mria_ct.erl +++ b/test/mria_ct.erl @@ -87,6 +87,7 @@ cluster(Specs0, CommonEnv, ClusterOpts) -> , number => Number , role => Role , code_paths => CodePaths + , beam_args => proplists:get_value(beam_args, ClusterOpts, "") , cover => Cover } || #{role := Role, name := Name, env := Env, code_paths := CodePaths, num := Number, cover := Cover} <- Specs]. @@ -103,10 +104,11 @@ start_cluster(mria_async, Specs) -> spawn(fun() -> [start_mria(I) || I <- Specs] end), Ret. -start_slave(node, #{name := Name, env := Env, code_paths := CodePaths, cover := Cover}) -> +start_slave(node, #{name := Name, env := Env, code_paths := CodePaths, cover := Cover} = Spec) -> CommonBeamOpts = "+S 1:1 " % We want VMs to only occupy a single core "-kernel inet_dist_listen_min 3000 " % Avoid collisions with gen_rpc ports - "-kernel inet_dist_listen_max 3050 ", + "-kernel inet_dist_listen_max 3050 " + ++ maps:get(beam_args, Spec, "") ++ " ", Node = do_start_slave(Name, CommonBeamOpts), Self = filename:dirname(code:which(?MODULE)), [rpc:call(Node, code, add_patha, [Path]) || Path <- [Self|CodePaths]],