| commit 721: | 6f3c9a65fa57 |
| parent 720: | 779150a843f2 |
| branch: | default |
First pass at JS reduce functions
Changed (Δ617 bytes):
raw changeset »
apps/js_data/src/jsd_generator.erl (1 lines added, 3 lines removed)
apps/riak/src/riak_reduce_phase_fsm.erl (22 lines added, 8 lines removed)
Up to file-list apps/js_data/src/jsd_generator.erl:
6 |
6 |
generate(Client, Count) -> |
7 |
7 |
rand_init(), |
8 |
8 |
Records = generate_data(Client, Count, []), |
9 |
lists:foreach(fun(Record) -> |
|
10 |
Obj = riak_object:new(<<"customers">>, <<"customer_list">>, Record), |
|
11 |
|
|
9 |
Client:put(riak_object:new(<<"customers">>, <<"customer_list">>, Records), 1). |
|
12 |
10 |
|
13 |
11 |
%% Internal functions |
14 |
12 |
generate_data(_Client, 0, Accum) -> |
Up to file-list apps/riak/src/riak_reduce_phase_fsm.erl:
10 |
10 |
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
11 |
11 |
%% KIND, either express or implied. See the License for the |
12 |
12 |
%% specific language governing permissions and limitations |
13 |
%% under the License. |
|
13 |
%% under the License. |
|
14 |
14 |
|
15 |
15 |
-module(riak_reduce_phase_fsm). |
16 |
16 |
-behaviour(gen_fsm). |
19 |
19 |
-export([init/1, handle_event/3, handle_sync_event/4, |
20 |
20 |
handle_info/3, terminate/3, code_change/4]). |
21 |
21 |
|
22 |
-export([wait/2]). |
|
22 |
-export([wait/2]). |
|
23 |
23 |
|
24 |
-record(state, {done,qterm,next_fsm,coord,acc,reduced,fresh_input |
|
24 |
-record(state, {done,qterm,next_fsm,coord,acc,reduced,fresh_input, jsctx}). |
|
25 |
25 |
|
26 |
26 |
start_link(_Ring,QTerm,NextFSM,Coordinator) -> |
27 |
27 |
gen_fsm:start_link(?MODULE, [QTerm,NextFSM,Coordinator], []). |
| … | … | @@ -29,13 +29,15 @@ start_link(_Ring,QTerm,NextFSM,Coordinat |
29 |
29 |
init([QTerm,NextFSM,Coordinator]) -> |
30 |
30 |
{_,_,_,Acc} = QTerm, |
31 |
31 |
riak_eventer:notify(riak_reduce_phase_fsm, reduce_start, start), |
32 |
{ok, Ctx} = js_driver:new(), |
|
32 |
33 |
{ok,wait,#state{done=false,qterm=QTerm,next_fsm=NextFSM,fresh_input=false, |
33 |
coord=Coordinator,acc=Acc,reduced=[] |
|
34 |
coord=Coordinator,acc=Acc,reduced=[], jsctx=Ctx}}. |
|
34 |
35 |
|
35 |
36 |
wait(timeout, StateData=#state{next_fsm=NextFSM,done=Done, |
36 |
37 |
acc=Acc,fresh_input=Fresh, |
37 |
38 |
qterm={reduce,FunTerm,Arg,_Acc}, |
38 |
coord=Coord,reduced=Reduced |
|
39 |
coord=Coord,reduced=Reduced, |
|
40 |
jsctx=JsCtx}) -> |
|
39 |
41 |
{Res,Red} = case Fresh of |
40 |
42 |
false -> |
41 |
43 |
{{next_state, wait, StateData#state{reduced=Reduced}},Reduced}; |
| … | … | @@ -43,7 +45,8 @@ wait(timeout, StateData=#state{next_fsm= |
43 |
45 |
try |
44 |
46 |
NewReduced = case FunTerm of |
45 |
47 |
{qfun,F} -> F(Reduced,Arg); |
46 |
{modfun,M,F} -> M:F(Reduced,Arg) |
|
48 |
{modfun,M,F} -> M:F(Reduced,Arg); |
|
49 |
{jsfun, F} -> invoke_js(JsCtx, <<"riak_reducer">>, F, Reduced, Arg) |
|
47 |
50 |
end, |
48 |
51 |
{{next_state, wait, StateData#state{reduced=NewReduced}}, |
49 |
52 |
NewReduced} |
| … | … | @@ -62,7 +65,7 @@ wait(timeout, StateData=#state{next_fsm= |
62 |
65 |
true -> |
63 |
66 |
case NextFSM of |
64 |
67 |
final -> nop; |
65 |
_ -> |
|
68 |
_ -> |
|
66 |
69 |
gen_fsm:send_event(NextFSM, {input, Red}), |
67 |
70 |
gen_fsm:send_event(NextFSM, done) |
68 |
71 |
end, |
| … | … | @@ -95,9 +98,20 @@ handle_sync_event(_Event, _From, _StateN |
95 |
98 |
handle_info(_Info, _StateName, StateData) -> |
96 |
99 |
{stop,badmsg,StateData}. |
97 |
100 |
%% @private |
98 |
terminate(Reason, _StateName, |
|
101 |
terminate(Reason, _StateName, State) -> |
|
99 |
102 |
riak_eventer:notify(riak_reduce_phase_fsm, phase_end, Reason), |
103 |
js_driver:destroy(State#state.jsctx), |
|
100 |
104 |
Reason. |
105 |
||
101 |
106 |
%% @private |
102 |
107 |
code_change(_OldVsn, StateName, State, _Extra) -> {ok, StateName, State}. |
103 |
108 |
|
109 |
%% @private |
|
110 |
invoke_js(JsCtx, FunName, FunSource, Reduced, Arg) -> |
|
111 |
js:define(JsCtx, list_to_binary(["var ", FunName, "=", FunSource])), |
|
112 |
case js:call(JsCtx, FunName, [Reduced, Arg]) of |
|
113 |
{ok, Result} -> |
|
114 |
Result; |
|
115 |
Error -> |
|
116 |
Error |
|
117 |
end. |
