| commit 389: | 93750f3fbbe2 |
| parent 388: | 05449b89c6c0 |
| branch: | default |
| tags: | riak-0.6.1 |
moving vclock pruning from FSM into vclock module and pushing it to backend-storage time
Changed (Δ6.7 KB):
raw changeset »
src/riak_get_fsm.erl (6 lines added, 4 lines removed)
src/riak_put_fsm.erl (3 lines added, 86 lines removed)
src/riak_vnode.erl (11 lines added, 4 lines removed)
src/riak_vnode_master.erl (2 lines added, 2 lines removed)
src/vclock.erl (74 lines added, 1 lines removed)
Up to file-list src/riak_get_fsm.erl:
| … | … | @@ -170,12 +170,14 @@ finalize(StateData=#state{final_obj=Fina |
170 |
170 |
bkey=BKey, |
171 |
171 |
req_id=ReqId, |
172 |
172 |
replied_notfound=NotFound, |
173 |
ring=Ring |
|
173 |
ring=Ring, |
|
174 |
starttime=StartTime}) -> |
|
174 |
175 |
case Final of |
175 |
176 |
{error, notfound} -> |
176 |
177 |
maybe_finalize_delete(StateData); |
177 |
178 |
{ok,_} -> |
178 |
maybe_do_read_repair(Ring,Final,RepliedR,NotFound,BKey, |
|
179 |
maybe_do_read_repair(Ring,Final,RepliedR,NotFound,BKey, |
|
180 |
ReqId,StartTime); |
|
179 |
181 |
_ -> nop |
180 |
182 |
end, |
181 |
183 |
{stop,normal,StateData}. |
| … | … | @@ -208,11 +210,11 @@ maybe_finalize_delete(_StateData=#state{ |
208 |
210 |
end |
209 |
211 |
end). |
210 |
212 |
|
211 |
maybe_do_read_repair(Ring,Final,RepliedR,NotFound,BKey,ReqId |
|
213 |
maybe_do_read_repair(Ring,Final,RepliedR,NotFound,BKey,ReqId,StartTime) -> |
|
212 |
214 |
Targets0 = ancestor_indices(Final, RepliedR) ++ NotFound, |
213 |
215 |
Targets = [{Idx,riak_ring:index_owner(Ring,Idx)} || Idx <- Targets0], |
214 |
216 |
{ok, FinalRObj} = Final, |
215 |
Msg = {self(), BKey, FinalRObj, ReqId |
|
217 |
Msg = {self(), BKey, FinalRObj, ReqId, StartTime}, |
|
216 |
218 |
case Targets of |
217 |
219 |
[] -> nop; |
218 |
220 |
_ -> |
Up to file-list src/riak_put_fsm.erl:
| … | … | @@ -52,15 +52,15 @@ init([ReqId,RObj0,W,DW,Timeout,Client]) |
52 |
52 |
%% @private |
53 |
53 |
initialize(timeout, StateData0=#state{robj=RObj0, req_id=ReqId, |
54 |
54 |
timeout=Timeout, ring=Ring}) -> |
55 |
RObj = update_metadata(RObj0), |
|
55 |
56 |
RealStartTime = riak_util:moment(), |
56 |
Bucket = riak_object:bucket(RObj |
|
57 |
Bucket = riak_object:bucket(RObj), |
|
57 |
58 |
BucketProps = riak_bucket:get_bucket(Bucket, Ring), |
58 |
RObj = prune_vclock(update_metadata(RObj0),BucketProps), |
|
59 |
59 |
Key = riak_object:key(RObj), |
60 |
60 |
riak_eventer:notify(riak_put_fsm, put_fsm_start, |
61 |
61 |
{ReqId, RealStartTime, Bucket, Key}), |
62 |
62 |
DocIdx = riak_util:chash_key({Bucket, Key}), |
63 |
Msg = {self(), {Bucket,Key}, RObj, ReqId |
|
63 |
Msg = {self(), {Bucket,Key}, RObj, ReqId, RealStartTime}, |
|
64 |
64 |
N = proplists:get_value(n_val,BucketProps), |
65 |
65 |
Preflist = riak_ring:filtered_preflist(DocIdx, Ring, N), |
66 |
66 |
{Targets, Fallbacks} = lists:split(N, Preflist), |
| … | … | @@ -194,93 +194,10 @@ update_metadata(RObj) -> |
194 |
194 |
end, |
195 |
195 |
riak_object:apply_updates(riak_object:update_metadata(RObj, NewMD)). |
196 |
196 |
|
197 |
prune_vclock(RObj,BucketProps) -> |
|
198 |
% This function is a little bit evil, as it relies on the |
|
199 |
% internal structure of vclocks. |
|
200 |
% That structure being [{Id, {Vsn, Timestamp}}] |
|
201 |
V = riak_object:vclock(RObj), |
|
202 |
SortV = lists:sort(fun({_,{_,A}},{_,{_,B}}) -> A < B end, V), |
|
203 |
Now = calendar:datetime_to_gregorian_seconds(erlang:universaltime()), |
|
204 |
case prune_vclock1(Now,SortV,BucketProps,no_change) of |
|
205 |
{no_change, _} -> RObj; |
|
206 |
{pruned, NewV} -> riak_object:set_vclock(RObj,NewV) |
|
207 |
end. |
|
208 |
||
209 |
prune_vclock1(Now,V,BProps,Changed) -> |
|
210 |
case length(V) =< proplists:get_value(small_vclock,BProps) of |
|
211 |
true -> {Changed, V}; |
|
212 |
false -> |
|
213 |
{_,{_,HeadTime}} = hd(V), |
|
214 |
case (Now - HeadTime) < proplists:get_value(young_vclock,BProps) of |
|
215 |
true -> {Changed, V}; |
|
216 |
false -> prune_vclock1(Now,V,BProps,Changed,HeadTime) |
|
217 |
end |
|
218 |
end. |
|
219 |
prune_vclock1(Now,V,BProps,Changed,HeadTime) -> |
|
220 |
% has a precondition that V is longer than small and older than young |
|
221 |
case length(V) > proplists:get_value(big_vclock,BProps) of |
|
222 |
true -> prune_vclock1(Now,tl(V),BProps,pruned); |
|
223 |
false -> |
|
224 |
case (Now - HeadTime) > proplists:get_value(old_vclock,BProps) of |
|
225 |
true -> prune_vclock1(Now,tl(V),BProps,pruned); |
|
226 |
false -> {Changed, V} |
|
227 |
end |
|
228 |
end. |
|
229 |
||
230 |
197 |
make_vtag(RObj) -> |
231 |
198 |
<<HashAsNum:128/integer>> = crypto:md5(term_to_binary(riak_object:vclock(RObj))), |
232 |
199 |
riak_util:integer_to_list(HashAsNum,62). |
233 |
200 |
|
234 |
% following two are just utility functions for test assist |
|
235 |
vc_obj(VC) -> riak_object:set_vclock(riak_object:new(<<"b">>,<<"k">>,<<"v">>), VC). |
|
236 |
obj_vc(OB) -> riak_object:vclock(OB). |
|
237 |
||
238 |
prune_small_vclock_test() -> |
|
239 |
% vclock with less entries than small_vclock will be untouched |
|
240 |
OldTime = calendar:datetime_to_gregorian_seconds(erlang:universaltime()) |
|
241 |
- 32000000, |
|
242 |
SmallVC = [{<<"1">>, {1, OldTime}}, |
|
243 |
{<<"2">>, {2, OldTime}}, |
|
244 |
{<<"3">>, {3, OldTime}}], |
|
245 |
Props = [{small_vclock,4}], |
|
246 |
?assertEqual(SmallVC, obj_vc(prune_vclock(vc_obj(SmallVC), Props))). |
|
247 |
||
248 |
prune_young_vclock_test() -> |
|
249 |
% vclock with all entries younger than young_vclock will be untouched |
|
250 |
NewTime = calendar:datetime_to_gregorian_seconds(erlang:universaltime()) |
|
251 |
- 1, |
|
252 |
VC = [{<<"1">>, {1, NewTime}}, |
|
253 |
{<<"2">>, {2, NewTime}}, |
|
254 |
{<<"3">>, {3, NewTime}}], |
|
255 |
Props = [{small_vclock,1},{young_vclock,1000}], |
|
256 |
?assertEqual(VC, obj_vc(prune_vclock(vc_obj(VC), Props))). |
|
257 |
||
258 |
prune_big_vclock_test() -> |
|
259 |
% vclock not preserved by small or young will be pruned down to |
|
260 |
% no larger than big_vclock entries |
|
261 |
NewTime = calendar:datetime_to_gregorian_seconds(erlang:universaltime()) |
|
262 |
- 1000, |
|
263 |
VC = [{<<"1">>, {1, NewTime}}, |
|
264 |
{<<"2">>, {2, NewTime}}, |
|
265 |
{<<"3">>, {3, NewTime}}], |
|
266 |
Props = [{small_vclock,1},{young_vclock,1}, |
|
267 |
{big_vclock,2},{old_vclock,100000}], |
|
268 |
?assert(length(obj_vc(prune_vclock(vc_obj(VC), Props))) =:= 2). |
|
269 |
||
270 |
prune_old_vclock_test() -> |
|
271 |
% vclock not preserved by small or young will be pruned down to |
|
272 |
% no larger than big_vclock and no entries more than old_vclock ago |
|
273 |
NewTime = calendar:datetime_to_gregorian_seconds(erlang:universaltime()) |
|
274 |
- 1000, |
|
275 |
OldTime = calendar:datetime_to_gregorian_seconds(erlang:universaltime()) |
|
276 |
- 100000, |
|
277 |
VC = [{<<"1">>, {1, NewTime}}, |
|
278 |
{<<"2">>, {2, OldTime}}, |
|
279 |
{<<"3">>, {3, OldTime}}], |
|
280 |
Props = [{small_vclock,1},{young_vclock,1}, |
|
281 |
{big_vclock,2},{old_vclock,10000}], |
|
282 |
?assert(length(obj_vc(prune_vclock(vc_obj(VC), Props))) =:= 1). |
|
283 |
||
284 |
201 |
make_vtag_test() -> |
285 |
202 |
Obj = riak_object:new(<<"b">>,<<"k">>,<<"v1">>), |
286 |
203 |
?assertNot(make_vtag(Obj) =:= |
Up to file-list src/riak_vnode.erl:
| … | … | @@ -75,11 +75,11 @@ handle_cast({map, ClientPid, QTerm, BKey |
75 |
75 |
VNode = self(), |
76 |
76 |
do_map(ClientPid,QTerm,BKey,KeyData,Cache,Mod,ModState,VNode), |
77 |
77 |
{noreply, State}; |
78 |
handle_cast({put, FSM_pid, BKey, RObj, ReqID |
|
78 |
handle_cast({put, FSM_pid, BKey, RObj, ReqID, FSMTime}, |
|
79 |
79 |
State=#state{mapcache=Cache,idx=Idx}) -> |
80 |
80 |
riak_eventer:notify(riak_vnode, put, {ReqID, Idx}), |
81 |
81 |
gen_fsm:send_event(FSM_pid, {w, Idx, ReqID}), |
82 |
do_put(FSM_pid, BKey, RObj, ReqID, |
|
82 |
do_put(FSM_pid, BKey, RObj, ReqID, FSMTime, State), |
|
83 |
83 |
{noreply, State#state{mapcache=dict:erase(BKey,Cache)}}; |
84 |
84 |
handle_cast({get, FSM_pid, BKey, ReqID}, State=#state{idx=Idx}) -> |
85 |
85 |
riak_eventer:notify(riak_vnode, get, {ReqID, Idx}), |
| … | … | @@ -149,13 +149,19 @@ do_delete(Client, BKey, ReqID, |
149 |
149 |
simple_binary_put(BKey, Val, Mod, ModState) -> |
150 |
150 |
Mod:put(ModState, BKey, Val). |
151 |
151 |
|
152 |
do_put(FSM_pid, BKey, RObj, ReqID, |
|
152 |
do_put(FSM_pid, BKey, RObj, ReqID, PruneTime, |
|
153 |
153 |
_State=#state{idx=Idx,mod=Mod,modstate=ModState}) -> |
154 |
{ok,Ring} = riak_ring_manager:get_my_ring(), |
|
155 |
{Bucket,_Key} = BKey, |
|
156 |
BProps = riak_bucket:get_bucket(Bucket, Ring), |
|
154 |
157 |
case syntactic_put_merge(Mod, ModState, BKey, RObj, ReqID) of |
155 |
158 |
oldobj -> |
156 |
159 |
riak_eventer:notify(riak_vnode,put_reply,ReqID), |
157 |
160 |
gen_fsm:send_event(FSM_pid, {dw, Idx, ReqID}); |
158 |
{newobj, |
|
161 |
{newobj, NewObj} -> |
|
162 |
VC = riak_object:vclock(NewObj), |
|
163 |
ObjToStore = riak_object:set_vclock(NewObj, |
|
164 |
vclock:prune(VC,PruneTime,BProps)), |
|
159 |
165 |
Val = term_to_binary(ObjToStore, [compressed]), |
160 |
166 |
case simple_binary_put(BKey, Val, Mod, ModState) of |
161 |
167 |
ok -> |
| … | … | @@ -280,3 +286,4 @@ syntactic_put_merge(Mod, ModState, BKey, |
280 |
286 |
false -> {newobj, ResObj} |
281 |
287 |
end |
282 |
288 |
end. |
289 |
Up to file-list src/riak_vnode_master.erl:
| … | … | @@ -37,9 +37,9 @@ handle_cast({vnode_map, {Partition,_Node |
37 |
37 |
% (obligation done, now the problem of the vnodes) |
38 |
38 |
{noreply, State}; |
39 |
39 |
handle_cast({vnode_put, {Partition,_Node}, |
40 |
{FSM_pid,BKey,RObj,ReqID |
|
40 |
{FSM_pid,BKey,RObj,ReqID,FSMTime}}, State) -> |
|
41 |
41 |
Pid = get_vnode(Partition, State), |
42 |
gen_server2:cast(Pid, {put, FSM_pid, BKey, RObj, ReqID |
|
42 |
gen_server2:cast(Pid, {put, FSM_pid, BKey, RObj, ReqID, FSMTime}), |
|
43 |
43 |
% (obligation done, now the problem of the vnodes) |
44 |
44 |
{noreply, State}; |
45 |
45 |
handle_cast({vnode_get, {Partition,_Node}, |
Up to file-list src/vclock.erl:
29 |
29 |
-author('Andy Gross <andy@basho.com>'). |
30 |
30 |
|
31 |
31 |
-export([fresh/0,descends/2,merge/1,get_counter/2,get_timestamp/2, |
32 |
increment/2,all_nodes/1,equal/2 |
|
32 |
increment/2,all_nodes/1,equal/2,prune/3]). |
|
33 |
33 |
|
34 |
34 |
-include_lib("eunit/include/eunit.hrl"). |
35 |
35 |
|
| … | … | @@ -167,3 +167,76 @@ equal(VA,VB) -> |
167 |
167 |
false -> true |
168 |
168 |
end |
169 |
169 |
end. |
170 |
||
171 |
% @doc Possibly shrink the size of a vclock, depending on current age and size. |
|
172 |
% @spec prune(V::vclock(), Now::integer(), BucketProps::term()) -> vclock() |
|
173 |
prune(V,Now,BucketProps) -> |
|
174 |
SortV = lists:sort(fun({_,{_,A}},{_,{_,B}}) -> A < B end, V), |
|
175 |
prune_vclock1(SortV,Now,BucketProps). |
|
176 |
% @private |
|
177 |
prune_vclock1(V,Now,BProps) -> |
|
178 |
case length(V) =< proplists:get_value(small_vclock,BProps) of |
|
179 |
true -> V; |
|
180 |
false -> |
|
181 |
{_,{_,HeadTime}} = hd(V), |
|
182 |
case (Now - HeadTime) < proplists:get_value(young_vclock,BProps) of |
|
183 |
true -> V; |
|
184 |
false -> prune_vclock1(V,Now,BProps,HeadTime) |
|
185 |
end |
|
186 |
end. |
|
187 |
% @private |
|
188 |
prune_vclock1(V,Now,BProps,HeadTime) -> |
|
189 |
% has a precondition that V is longer than small and older than young |
|
190 |
case length(V) > proplists:get_value(big_vclock,BProps) of |
|
191 |
true -> prune_vclock1(tl(V),Now,BProps); |
|
192 |
false -> |
|
193 |
case (Now - HeadTime) > proplists:get_value(old_vclock,BProps) of |
|
194 |
true -> prune_vclock1(tl(V),Now,BProps); |
|
195 |
false -> V |
|
196 |
end |
|
197 |
end. |
|
198 |
||
199 |
prune_small_test() -> |
|
200 |
% vclock with less entries than small_vclock will be untouched |
|
201 |
Now = riak_util:moment(), |
|
202 |
OldTime = Now - 32000000, |
|
203 |
SmallVC = [{<<"1">>, {1, OldTime}}, |
|
204 |
{<<"2">>, {2, OldTime}}, |
|
205 |
{<<"3">>, {3, OldTime}}], |
|
206 |
Props = [{small_vclock,4}], |
|
207 |
?assertEqual(lists:sort(SmallVC), lists:sort(prune(SmallVC, Now, Props))). |
|
208 |
||
209 |
prune_young_test() -> |
|
210 |
% vclock with all entries younger than young_vclock will be untouched |
|
211 |
Now = riak_util:moment(), |
|
212 |
NewTime = Now - 1, |
|
213 |
VC = [{<<"1">>, {1, NewTime}}, |
|
214 |
{<<"2">>, {2, NewTime}}, |
|
215 |
{<<"3">>, {3, NewTime}}], |
|
216 |
Props = [{small_vclock,1},{young_vclock,1000}], |
|
217 |
?assertEqual(lists:sort(VC), lists:sort(prune(VC, Now, Props))). |
|
218 |
||
219 |
prune_big_test() -> |
|
220 |
% vclock not preserved by small or young will be pruned down to |
|
221 |
% no larger than big_vclock entries |
|
222 |
Now = riak_util:moment(), |
|
223 |
NewTime = Now - 1000, |
|
224 |
VC = [{<<"1">>, {1, NewTime}}, |
|
225 |
{<<"2">>, {2, NewTime}}, |
|
226 |
{<<"3">>, {3, NewTime}}], |
|
227 |
Props = [{small_vclock,1},{young_vclock,1}, |
|
228 |
{big_vclock,2},{old_vclock,100000}], |
|
229 |
?assert(length(prune(VC, Now, Props)) =:= 2). |
|
230 |
||
231 |
prune_old_test() -> |
|
232 |
% vclock not preserved by small or young will be pruned down to |
|
233 |
% no larger than big_vclock and no entries more than old_vclock ago |
|
234 |
Now = riak_util:moment(), |
|
235 |
NewTime = Now - 1000, |
|
236 |
OldTime = Now - 100000, |
|
237 |
VC = [{<<"1">>, {1, NewTime}}, |
|
238 |
{<<"2">>, {2, OldTime}}, |
|
239 |
{<<"3">>, {3, OldTime}}], |
|
240 |
Props = [{small_vclock,1},{young_vclock,1}, |
|
241 |
{big_vclock,2},{old_vclock,10000}], |
|
242 |
?assert(length(prune(VC, Now, Props)) =:= 1). |
