basho / riak (http://riak.basho.com/)

Riak is a decentralized datastore from Basho Technologies.

Clone this repository (size: 12.5 MB): HTTPS / SSH
$ hg clone http://hg.basho.com/riak
commit 389: 93750f3fbbe2
parent 388: 05449b89c6c0
branch: default
tags: riak-0.6.1
moving vclock pruning from FSM into vclock module and pushing it to backend-storage time
Justin Sheehy / justin
10 months ago

Changed (Δ6.7 KB):

raw changeset »

src/riak_get_fsm.erl (6 lines added, 4 lines removed)

src/riak_put_fsm.erl (3 lines added, 86 lines removed)

src/riak_vnode.erl (11 lines added, 4 lines removed)

src/riak_vnode_master.erl (2 lines added, 2 lines removed)

src/vclock.erl (74 lines added, 1 lines removed)

Up to file-list src/riak_get_fsm.erl:

@@ -170,12 +170,14 @@ finalize(StateData=#state{final_obj=Fina
170
170
                          bkey=BKey,
171
171
                          req_id=ReqId,
172
172
                          replied_notfound=NotFound,
173
                          ring=Ring}) ->
173
                          ring=Ring,
174
                          starttime=StartTime}) ->
174
175
    case Final of
175
176
        {error, notfound} ->
176
177
            maybe_finalize_delete(StateData);
177
178
        {ok,_} ->
178
            maybe_do_read_repair(Ring,Final,RepliedR,NotFound,BKey,ReqId);
179
            maybe_do_read_repair(Ring,Final,RepliedR,NotFound,BKey,
180
                                 ReqId,StartTime);
179
181
        _ -> nop
180
182
    end,
181
183
    {stop,normal,StateData}.
@@ -208,11 +210,11 @@ maybe_finalize_delete(_StateData=#state{
208
210
    end
209
211
    end).
210
212
211
maybe_do_read_repair(Ring,Final,RepliedR,NotFound,BKey,ReqId) ->
213
maybe_do_read_repair(Ring,Final,RepliedR,NotFound,BKey,ReqId,StartTime) ->
212
214
    Targets0 = ancestor_indices(Final, RepliedR) ++ NotFound,
213
215
    Targets = [{Idx,riak_ring:index_owner(Ring,Idx)} || Idx <- Targets0],
214
216
    {ok, FinalRObj} = Final,
215
    Msg = {self(), BKey, FinalRObj, ReqId},
217
    Msg = {self(), BKey, FinalRObj, ReqId, StartTime},
216
218
    case Targets of
217
219
        [] -> nop;
218
220
        _ ->

Up to file-list src/riak_put_fsm.erl:

@@ -52,15 +52,15 @@ init([ReqId,RObj0,W,DW,Timeout,Client])
52
52
%% @private
53
53
initialize(timeout, StateData0=#state{robj=RObj0, req_id=ReqId,
54
54
                                      timeout=Timeout, ring=Ring}) ->
55
    RObj = update_metadata(RObj0),
55
56
    RealStartTime = riak_util:moment(),
56
    Bucket = riak_object:bucket(RObj0),
57
    Bucket = riak_object:bucket(RObj),
57
58
    BucketProps = riak_bucket:get_bucket(Bucket, Ring),
58
    RObj = prune_vclock(update_metadata(RObj0),BucketProps),
59
59
    Key = riak_object:key(RObj),
60
60
    riak_eventer:notify(riak_put_fsm, put_fsm_start,
61
61
                        {ReqId, RealStartTime, Bucket, Key}),
62
62
    DocIdx = riak_util:chash_key({Bucket, Key}),
63
    Msg = {self(), {Bucket,Key}, RObj, ReqId},
63
    Msg = {self(), {Bucket,Key}, RObj, ReqId, RealStartTime},
64
64
    N = proplists:get_value(n_val,BucketProps),
65
65
    Preflist = riak_ring:filtered_preflist(DocIdx, Ring, N),
66
66
    {Targets, Fallbacks} = lists:split(N, Preflist),
@@ -194,93 +194,10 @@ update_metadata(RObj) ->
194
194
    end,
195
195
    riak_object:apply_updates(riak_object:update_metadata(RObj, NewMD)).
196
196
197
prune_vclock(RObj,BucketProps) ->
198
    % This function is a little bit evil, as it relies on the
199
    % internal structure of vclocks.
200
    % That structure being [{Id, {Vsn, Timestamp}}]
201
    V = riak_object:vclock(RObj),
202
    SortV = lists:sort(fun({_,{_,A}},{_,{_,B}}) -> A < B end, V),
203
    Now = calendar:datetime_to_gregorian_seconds(erlang:universaltime()),
204
    case prune_vclock1(Now,SortV,BucketProps,no_change) of
205
        {no_change, _} -> RObj;
206
        {pruned, NewV} -> riak_object:set_vclock(RObj,NewV)
207
    end.
208
209
prune_vclock1(Now,V,BProps,Changed) ->
210
    case length(V) =< proplists:get_value(small_vclock,BProps) of
211
        true -> {Changed, V};
212
        false ->
213
            {_,{_,HeadTime}} = hd(V),
214
            case (Now - HeadTime) < proplists:get_value(young_vclock,BProps) of
215
                true -> {Changed, V};
216
                false -> prune_vclock1(Now,V,BProps,Changed,HeadTime)
217
            end
218
    end.
219
prune_vclock1(Now,V,BProps,Changed,HeadTime) ->
220
    % has a precondition that V is longer than small and older than young
221
    case length(V) > proplists:get_value(big_vclock,BProps) of
222
        true -> prune_vclock1(Now,tl(V),BProps,pruned);
223
        false ->
224
            case (Now - HeadTime) > proplists:get_value(old_vclock,BProps) of
225
                true -> prune_vclock1(Now,tl(V),BProps,pruned);
226
                false -> {Changed, V}
227
            end
228
    end.
229
230
197
make_vtag(RObj) ->
231
198
    <<HashAsNum:128/integer>> = crypto:md5(term_to_binary(riak_object:vclock(RObj))),
232
199
    riak_util:integer_to_list(HashAsNum,62).
233
200
234
% following two are just utility functions for test assist
235
vc_obj(VC) -> riak_object:set_vclock(riak_object:new(<<"b">>,<<"k">>,<<"v">>), VC).
236
obj_vc(OB) -> riak_object:vclock(OB).
237
238
prune_small_vclock_test() ->
239
    % vclock with less entries than small_vclock will be untouched
240
    OldTime = calendar:datetime_to_gregorian_seconds(erlang:universaltime())
241
               - 32000000,
242
    SmallVC = [{<<"1">>, {1, OldTime}},
243
               {<<"2">>, {2, OldTime}},
244
               {<<"3">>, {3, OldTime}}],
245
    Props = [{small_vclock,4}],
246
    ?assertEqual(SmallVC, obj_vc(prune_vclock(vc_obj(SmallVC), Props))).
247
248
prune_young_vclock_test() ->
249
    % vclock with all entries younger than young_vclock will be untouched
250
    NewTime = calendar:datetime_to_gregorian_seconds(erlang:universaltime())
251
               - 1,
252
    VC = [{<<"1">>, {1, NewTime}},
253
          {<<"2">>, {2, NewTime}},
254
          {<<"3">>, {3, NewTime}}],
255
    Props = [{small_vclock,1},{young_vclock,1000}],
256
    ?assertEqual(VC, obj_vc(prune_vclock(vc_obj(VC), Props))).
257
258
prune_big_vclock_test() ->
259
    % vclock not preserved by small or young will be pruned down to
260
    % no larger than big_vclock entries
261
    NewTime = calendar:datetime_to_gregorian_seconds(erlang:universaltime())
262
               - 1000,
263
    VC = [{<<"1">>, {1, NewTime}},
264
          {<<"2">>, {2, NewTime}},
265
          {<<"3">>, {3, NewTime}}],
266
    Props = [{small_vclock,1},{young_vclock,1},
267
             {big_vclock,2},{old_vclock,100000}],
268
    ?assert(length(obj_vc(prune_vclock(vc_obj(VC), Props))) =:= 2).
269
270
prune_old_vclock_test() ->
271
    % vclock not preserved by small or young will be pruned down to
272
    % no larger than big_vclock and no entries more than old_vclock ago
273
    NewTime = calendar:datetime_to_gregorian_seconds(erlang:universaltime())
274
               - 1000,
275
    OldTime = calendar:datetime_to_gregorian_seconds(erlang:universaltime())
276
               - 100000,    
277
    VC = [{<<"1">>, {1, NewTime}},
278
          {<<"2">>, {2, OldTime}},
279
          {<<"3">>, {3, OldTime}}],
280
    Props = [{small_vclock,1},{young_vclock,1},
281
             {big_vclock,2},{old_vclock,10000}],
282
    ?assert(length(obj_vc(prune_vclock(vc_obj(VC), Props))) =:= 1).
283
284
201
make_vtag_test() ->
285
202
    Obj = riak_object:new(<<"b">>,<<"k">>,<<"v1">>),
286
203
    ?assertNot(make_vtag(Obj) =:= 

Up to file-list src/riak_vnode.erl:

@@ -75,11 +75,11 @@ handle_cast({map, ClientPid, QTerm, BKey
75
75
    VNode = self(),
76
76
    do_map(ClientPid,QTerm,BKey,KeyData,Cache,Mod,ModState,VNode),
77
77
    {noreply, State};
78
handle_cast({put, FSM_pid, BKey, RObj, ReqID},
78
handle_cast({put, FSM_pid, BKey, RObj, ReqID, FSMTime},
79
79
            State=#state{mapcache=Cache,idx=Idx}) ->
80
80
    riak_eventer:notify(riak_vnode, put, {ReqID, Idx}),
81
81
    gen_fsm:send_event(FSM_pid, {w, Idx, ReqID}),
82
    do_put(FSM_pid, BKey, RObj, ReqID, State),
82
    do_put(FSM_pid, BKey, RObj, ReqID, FSMTime, State),
83
83
    {noreply, State#state{mapcache=dict:erase(BKey,Cache)}};
84
84
handle_cast({get, FSM_pid, BKey, ReqID}, State=#state{idx=Idx}) ->
85
85
    riak_eventer:notify(riak_vnode, get, {ReqID, Idx}),
@@ -149,13 +149,19 @@ do_delete(Client, BKey, ReqID,
149
149
simple_binary_put(BKey, Val, Mod, ModState) ->
150
150
    Mod:put(ModState, BKey, Val).
151
151
152
do_put(FSM_pid, BKey, RObj, ReqID,
152
do_put(FSM_pid, BKey, RObj, ReqID, PruneTime, 
153
153
       _State=#state{idx=Idx,mod=Mod,modstate=ModState}) ->
154
    {ok,Ring} = riak_ring_manager:get_my_ring(),    
155
    {Bucket,_Key} = BKey,
156
    BProps = riak_bucket:get_bucket(Bucket, Ring),
154
157
    case syntactic_put_merge(Mod, ModState, BKey, RObj, ReqID) of
155
158
        oldobj -> 
156
159
            riak_eventer:notify(riak_vnode,put_reply,ReqID),
157
160
            gen_fsm:send_event(FSM_pid, {dw, Idx, ReqID});
158
        {newobj, ObjToStore} ->
161
        {newobj, NewObj} ->
162
            VC = riak_object:vclock(NewObj),
163
            ObjToStore = riak_object:set_vclock(NewObj,
164
                                           vclock:prune(VC,PruneTime,BProps)),
159
165
            Val = term_to_binary(ObjToStore, [compressed]),
160
166
            case simple_binary_put(BKey, Val, Mod, ModState) of
161
167
                ok ->
@@ -280,3 +286,4 @@ syntactic_put_merge(Mod, ModState, BKey,
280
286
                false -> {newobj, ResObj}
281
287
            end    
282
288
    end.
289

Up to file-list src/riak_vnode_master.erl:

@@ -37,9 +37,9 @@ handle_cast({vnode_map, {Partition,_Node
37
37
    % (obligation done, now the problem of the vnodes)
38
38
    {noreply, State};
39
39
handle_cast({vnode_put, {Partition,_Node},
40
             {FSM_pid,BKey,RObj,ReqID}}, State) ->
40
             {FSM_pid,BKey,RObj,ReqID,FSMTime}}, State) ->
41
41
    Pid = get_vnode(Partition, State),
42
    gen_server2:cast(Pid, {put, FSM_pid, BKey, RObj, ReqID}),
42
    gen_server2:cast(Pid, {put, FSM_pid, BKey, RObj, ReqID, FSMTime}),
43
43
    % (obligation done, now the problem of the vnodes)
44
44
    {noreply, State};
45
45
handle_cast({vnode_get, {Partition,_Node},

Up to file-list src/vclock.erl:

29
29
-author('Andy Gross <andy@basho.com>').
30
30
31
31
-export([fresh/0,descends/2,merge/1,get_counter/2,get_timestamp/2,
32
	increment/2,all_nodes/1,equal/2]).
32
	increment/2,all_nodes/1,equal/2,prune/3]).
33
33
34
34
-include_lib("eunit/include/eunit.hrl").
35
35
@@ -167,3 +167,76 @@ equal(VA,VB) ->
167
167
                false -> true
168
168
            end
169
169
    end.
170
171
% @doc Possibly shrink the size of a vclock, depending on current age and size.
172
% @spec prune(V::vclock(), Now::integer(), BucketProps::term()) -> vclock()
173
prune(V,Now,BucketProps) ->
174
    SortV = lists:sort(fun({_,{_,A}},{_,{_,B}}) -> A < B end, V),
175
    prune_vclock1(SortV,Now,BucketProps).
176
% @private
177
prune_vclock1(V,Now,BProps) ->
178
    case length(V) =< proplists:get_value(small_vclock,BProps) of
179
        true -> V;
180
        false ->
181
            {_,{_,HeadTime}} = hd(V),
182
            case (Now - HeadTime) < proplists:get_value(young_vclock,BProps) of
183
                true -> V;
184
                false -> prune_vclock1(V,Now,BProps,HeadTime)
185
            end
186
    end.
187
% @private
188
prune_vclock1(V,Now,BProps,HeadTime) ->
189
    % has a precondition that V is longer than small and older than young
190
    case length(V) > proplists:get_value(big_vclock,BProps) of
191
        true -> prune_vclock1(tl(V),Now,BProps);
192
        false ->
193
            case (Now - HeadTime) > proplists:get_value(old_vclock,BProps) of
194
                true -> prune_vclock1(tl(V),Now,BProps);
195
                false -> V
196
            end
197
    end.
198
199
prune_small_test() ->
200
    % vclock with less entries than small_vclock will be untouched
201
    Now = riak_util:moment(),
202
    OldTime = Now - 32000000,
203
    SmallVC = [{<<"1">>, {1, OldTime}},
204
               {<<"2">>, {2, OldTime}},
205
               {<<"3">>, {3, OldTime}}],
206
    Props = [{small_vclock,4}],
207
    ?assertEqual(lists:sort(SmallVC), lists:sort(prune(SmallVC, Now, Props))).
208
209
prune_young_test() ->
210
    % vclock with all entries younger than young_vclock will be untouched
211
    Now = riak_util:moment(),
212
    NewTime = Now - 1,
213
    VC = [{<<"1">>, {1, NewTime}},
214
          {<<"2">>, {2, NewTime}},
215
          {<<"3">>, {3, NewTime}}],
216
    Props = [{small_vclock,1},{young_vclock,1000}],
217
    ?assertEqual(lists:sort(VC), lists:sort(prune(VC, Now, Props))).
218
219
prune_big_test() ->
220
    % vclock not preserved by small or young will be pruned down to
221
    % no larger than big_vclock entries
222
    Now = riak_util:moment(),
223
    NewTime = Now - 1000,
224
    VC = [{<<"1">>, {1, NewTime}},
225
          {<<"2">>, {2, NewTime}},
226
          {<<"3">>, {3, NewTime}}],
227
    Props = [{small_vclock,1},{young_vclock,1},
228
             {big_vclock,2},{old_vclock,100000}],
229
    ?assert(length(prune(VC, Now, Props)) =:= 2).
230
231
prune_old_test() ->
232
    % vclock not preserved by small or young will be pruned down to
233
    % no larger than big_vclock and no entries more than old_vclock ago
234
    Now = riak_util:moment(),
235
    NewTime = Now - 1000,
236
    OldTime = Now - 100000,    
237
    VC = [{<<"1">>, {1, NewTime}},
238
          {<<"2">>, {2, OldTime}},
239
          {<<"3">>, {3, OldTime}}],
240
    Props = [{small_vclock,1},{young_vclock,1},
241
             {big_vclock,2},{old_vclock,10000}],
242
    ?assert(length(prune(VC, Now, Props)) =:= 1).