Skip to content

Handling Deleted Items in RT and Fullsync #489

@lordnull

Description

@lordnull

ZenDesk issue 6262, Netic seeing a consistent number of "Missing" counts in full sync logs. The simulation to get the same results is posted below.

The short version:
Given:
2 clusters A and B with bi-directional realtime
Puts, Updates, and Deletes are done on both A and B
If the same key is put on both A and B, but subsequently deleted on B then a full sync from A to B will show up as missing as realtime may not send the delete to A (or may not be handled well by A).

This only seems to appear at a larger scale.

Simulation goals: reproduce the missing keys message in the logs reliably and determine if data is being lost. Realtime must not be overloaded during the simulation.

To do the load, I patch basho_bench's riakc_pb driver to be able to round robin actions to different buckets. While the simulation could be done w/ one bucket, this more accurately reflects situation at client site.

Two basho_bench load scripts where used as the two clusters have different IP/Ports. A unified version presented below.

To determine data loss, a script was written that simply queries the entire key space on both clusters. The bucket, key, and if it was found or not is appended to a file for that cluster. A simple diff determines which keys were different. A full sync both ways determines how much difference there is, and a manual check of keys in a previous diff shows keys are not lost. The script I called 'nomall'.

Steps to simulate:

  1. Set up the simulation environment
    a. Using riak_ee 1.4.*, devrel
    b. dev1-3 is cluster A, dev4-6 is cluster b. Name the appropriately.
    c. Enable and start realtime from A to B and from B to A
    d. Enable (but do not start) full sync from A to B and from B to A. If a full sync starts on connect, wait for it to finish.
  2. Begin loading clusters
    a. patch basho_bench to allow for more than one bucket.
    b. Start a basho_bench for each cluster
    c. Allow load script to finish
    d. Allow realtime queue to empty
  3. Begin loading and full syncing clusters.
    a. Alter load configs to a new range of keys.
    b. Start a basho_bench for each cluster
    c. Wait 2 minutes
    d. Begin a full sync from A to B
    e. Allow full sync to finish; this will take much longer than the load script
  4. Check environment
    a. Check logs on source/sink side, and note how the source says there are keys missing.
    b. Do 'nomall'. Take note of the diff. Move resulting files so they can be compared later.
    c. Fullsync from A to B, allow to finish
    d. Fullsync from B to A, allow to finish
    e. Do 'nomall'. There should be no differences. Keys that were not found should be found on both sides

basho_bench patch

diff --git a/src/basho_bench_driver_riakc_pb.erl b/src/basho_bench_driver_riakc_pb.erl
index 7522ce5..221aa46 100644
--- a/src/basho_bench_driver_riakc_pb.erl
+++ b/src/basho_bench_driver_riakc_pb.erl
@@ -145,7 +145,9 @@ warn_bucket_mr_correctness(_) ->

 run(get, KeyGen, _ValueGen, State) ->
     Key = KeyGen(),
-    case riakc_pb_socket:get(State#state.pid, State#state.bucket, Key,
+    Bucket = oneof(State#state.bucket),
+    ?DEBUG("run get: ~p ~p", [Key, Bucket]),
+    case riakc_pb_socket:get(State#state.pid, Bucket, Key,
                              [{r, State#state.r}], State#state.timeout_read) of
         {ok, _} ->
             {ok, State};
@@ -156,7 +158,9 @@ run(get, KeyGen, _ValueGen, State) ->
     end;
 run(get_existing, KeyGen, _ValueGen, State) ->
     Key = KeyGen(),
-    case riakc_pb_socket:get(State#state.pid, State#state.bucket, Key,
+    Bucket = oneof(State#state.bucket),
+    ?DEBUG("run get_exising: ~p ~p", [Key, Bucket]),
+    case riakc_pb_socket:get(State#state.pid, Bucket, Key,
                              [{r, State#state.r}], State#state.timeout_read) of
         {ok, _} ->
             {ok, State};
@@ -166,7 +170,10 @@ run(get_existing, KeyGen, _ValueGen, State) ->
             {error, Reason, State}
     end;
 run(put, KeyGen, ValueGen, State) ->
-    Robj0 = riakc_obj:new(State#state.bucket, KeyGen()),
+    Key = KeyGen(),
+    Bucket = oneof(State#state.bucket),
+    ?DEBUG("run put: ~p ~p", [Key, Bucket]),
+    Robj0 = riakc_obj:new(Bucket, Key),
     Robj = riakc_obj:update_value(Robj0, ValueGen()),
     case riakc_pb_socket:put(State#state.pid, Robj, [{w, State#state.w},
                                                      {dw, State#state.dw}], State#state.timeout_write) of
@@ -177,7 +184,9 @@ run(put, KeyGen, ValueGen, State) ->
     end;
 run(update, KeyGen, ValueGen, State) ->
     Key = KeyGen(),
-    case riakc_pb_socket:get(State#state.pid, State#state.bucket,
+    Bucket = oneof(State#state.bucket),
+    ?DEBUG("run update: ~p ~p", [Key, Bucket]),
+    case riakc_pb_socket:get(State#state.pid, oneof(State#state.bucket),
                              Key, [{r, State#state.r}], State#state.timeout_read) of
         {ok, Robj} ->
             Robj2 = riakc_obj:update_value(Robj, ValueGen()),
@@ -189,7 +198,7 @@ run(update, KeyGen, ValueGen, State) ->
                     {error, Reason, State}
             end;
         {error, notfound} ->
-            Robj0 = riakc_obj:new(State#state.bucket, Key),
+            Robj0 = riakc_obj:new(oneof(State#state.bucket), Key),
             Robj = riakc_obj:update_value(Robj0, ValueGen()),
             case riakc_pb_socket:put(State#state.pid, Robj, [{w, State#state.w},
                                                              {dw, State#state.dw}], State#state.timeout_write) of
@@ -203,7 +212,9 @@ run(update, KeyGen, ValueGen, State) ->
     end;
 run(update_existing, KeyGen, ValueGen, State) ->
     Key = KeyGen(),
-    case riakc_pb_socket:get(State#state.pid, State#state.bucket,
+    Bucket = oneof(State#state.bucket),
+    ?DEBUG("run update_existing: ~p ~p", [Key, Bucket]),
+    case riakc_pb_socket:get(State#state.pid, Bucket,
                              Key, [{r, State#state.r}], State#state.timeout_read) of
         {ok, Robj} ->
             Robj2 = riakc_obj:update_value(Robj, ValueGen()),
@@ -221,7 +232,10 @@ run(update_existing, KeyGen, ValueGen, State) ->
     end;
 run(delete, KeyGen, _ValueGen, State) ->
     %% Pass on rw
-    case riakc_pb_socket:delete(State#state.pid, State#state.bucket, KeyGen(),
+    Key = KeyGen(),
+    Bucket = oneof(State#state.bucket),
+    ?DEBUG("run delete: ~p ~p", [Key, Bucket]),
+    case riakc_pb_socket:delete(State#state.pid, Bucket, Key,
                                 [{rw, State#state.rw}], State#state.timeout_write) of
         ok ->
             {ok, State};
@@ -232,7 +246,9 @@ run(delete, KeyGen, _ValueGen, State) ->
     end;
 run(listkeys, _KeyGen, _ValueGen, State) ->
     %% Pass on rw
-    case riakc_pb_socket:list_keys(State#state.pid, State#state.bucket, State#state.timeout_listkeys) of
+    Bucket = oneof(State#state.bucket),
+    ?INFO("run listkeys: ~p", []),
+    case riakc_pb_socket:list_keys(State#state.pid, Bucket, State#state.timeout_listkeys) of
         {ok, _Keys} ->
             {ok, State};
         {error, Reason} ->
@@ -267,21 +283,21 @@ run(search_interval, _KeyGen, _ValueGen, #state{search_queries=SearchQs, start_t
               {error, Reason, NewState}
     end;
 run(mr_bucket_erlang, _KeyGen, _ValueGen, State) ->
-    mapred(State, State#state.bucket, ?ERLANG_MR);
+    mapred(State, oneof(State#state.bucket), ?ERLANG_MR);
 run(mr_bucket_js, _KeyGen, _ValueGen, State) ->
-    mapred(State, State#state.bucket, ?JS_MR);
+    mapred(State, oneof(State#state.bucket), ?JS_MR);
 run(mr_keylist_erlang, KeyGen, _ValueGen, State) ->
-    Keylist = make_keylist(State#state.bucket, KeyGen,
+    Keylist = make_keylist(oneof(State#state.bucket), KeyGen,
                            State#state.keylist_length),
     mapred(State, Keylist, ?ERLANG_MR);
 run(mr_keylist_js, KeyGen, _ValueGen, State) ->
-    Keylist = make_keylist(State#state.bucket, KeyGen,
+    Keylist = make_keylist(oneof(State#state.bucket), KeyGen,
                            State#state.keylist_length),
     mapred(State, Keylist, ?JS_MR);
 run(counter_incr, KeyGen, ValueGen, State) ->
     Amt = ValueGen(),
     Key = KeyGen(),
-    case riakc_pb_socket:counter_incr(State#state.pid, State#state.bucket, Key, Amt,
+    case riakc_pb_socket:counter_incr(State#state.pid, oneof(State#state.bucket), Key, Amt,
                                       [{w, State#state.w},
                                        {dw, State#state.dw},
                                        {pw, State#state.pw}]) of
@@ -292,7 +308,7 @@ run(counter_incr, KeyGen, ValueGen, State) ->
     end;
 run(counter_val, KeyGen, _ValueGen, State) ->
     Key = KeyGen(),
-    case riakc_pb_socket:counter_val(State#state.pid, State#state.bucket, Key,
+    case riakc_pb_socket:counter_val(State#state.pid, oneof(State#state.bucket), Key,
                                      [{r, State#state.r}, {pr, State#state.pr}]) of
         {ok, _N} ->
             {ok, State};
@@ -404,3 +420,10 @@ get_timeout(Name) when Name == pb_timeout_read;

 get_connect_options() ->
     basho_bench_config:get(pb_connect_options, [{auto_reconnect, true}]).
+
+oneof(List) when is_list(List) ->
+    Nth = random:uniform(length(List)),
+    lists:nth(Nth, List);
+
+oneof(NotList) ->
+    NotList.

loadit.config

{mode, max}.

{log_level, info}.

{duration, 10}.

{concurrent, 10}.

{driver, basho_bench_driver_riakc_pb}.

{key_generator, {to_binstr, "~p", {uniform_int, 10000, 19999}}}.

{value_generator, {fixed_bin, 1000, <<"data">>}}.

{riakc_pb_ips, [
    % for cluster A
    {{127,0,0,1}, [10017, 10027, 10037]}
    % for cluster B, comment out above and uncomment below
    %{{127,0,0,1}, [10047, 10057, 10067]}
]}.

{riakc_pb_replies, 1}.
{riakc_pb_bucket, [<<"b1">>, <<"b2">>, <<"b3">>]}.

{operations, [{update, 1}, {put, 1}, {delete, 1}]}.

nomall

#! /bin/bash

for b in "b1" "b2" "b3"; do
    for i in `seq 10000 19999`; do
        cacurl=`curl -sS "http://127.0.0.1:10018/riak/$b/$i"`;
        if [ "$cacurl" != "not found" ]; then
            cacurl="found";
        fi;
        echo "$b $i $cacurl" >> ca;
        cbcurl=`curl -sS "http://127.0.0.1:10048/riak/$b/$i"`;
        if [ "$cbcurl" != "not found" ]; then
            cbcurl="found";
        fi;
        echo "$b $i $cbcurl" >> cb;
        echo -n ".";
    done;
done;
echo "";

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions