basho / riak (http://riak.basho.com/)

Riak is a decentralized datastore from Basho Technologies.

Clone this repository (size: 12.5 MB): HTTPS / SSH
$ hg clone http://hg.basho.com/riak
commit 721: 6f3c9a65fa57
parent 720: 779150a843f2
branch: default
First pass at JS reduce functions
kevsmith
8 months ago

Changed (Δ617 bytes):

raw changeset »

apps/js_data/src/jsd_generator.erl (1 lines added, 3 lines removed)

apps/riak/src/riak_reduce_phase_fsm.erl (22 lines added, 8 lines removed)

Up to file-list apps/js_data/src/jsd_generator.erl:

6
6
generate(Client, Count) ->
7
7
    rand_init(),
8
8
    Records = generate_data(Client, Count, []),
9
    lists:foreach(fun(Record) ->
10
                          Obj = riak_object:new(<<"customers">>, <<"customer_list">>, Record),
11
                          Client:put(Obj, 1) end, Records).
9
    Client:put(riak_object:new(<<"customers">>, <<"customer_list">>, Records), 1).
12
10
13
11
%% Internal functions
14
12
generate_data(_Client, 0, Accum) ->

Up to file-list apps/riak/src/riak_reduce_phase_fsm.erl:

10
10
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
11
11
%% KIND, either express or implied.  See the License for the
12
12
%% specific language governing permissions and limitations
13
%% under the License.    
13
%% under the License.
14
14
15
15
-module(riak_reduce_phase_fsm).
16
16
-behaviour(gen_fsm).
19
19
-export([init/1, handle_event/3, handle_sync_event/4,
20
20
         handle_info/3, terminate/3, code_change/4]).
21
21
22
-export([wait/2]). 
22
-export([wait/2]).
23
23
24
-record(state, {done,qterm,next_fsm,coord,acc,reduced,fresh_input}).
24
-record(state, {done,qterm,next_fsm,coord,acc,reduced,fresh_input, jsctx}).
25
25
26
26
start_link(_Ring,QTerm,NextFSM,Coordinator) ->
27
27
    gen_fsm:start_link(?MODULE, [QTerm,NextFSM,Coordinator], []).
@@ -29,13 +29,15 @@ start_link(_Ring,QTerm,NextFSM,Coordinat
29
29
init([QTerm,NextFSM,Coordinator]) ->
30
30
    {_,_,_,Acc} = QTerm,
31
31
    riak_eventer:notify(riak_reduce_phase_fsm, reduce_start, start),
32
    {ok, Ctx} = js_driver:new(),
32
33
    {ok,wait,#state{done=false,qterm=QTerm,next_fsm=NextFSM,fresh_input=false,
33
                    coord=Coordinator,acc=Acc,reduced=[]}}.
34
                    coord=Coordinator,acc=Acc,reduced=[], jsctx=Ctx}}.
34
35
35
36
wait(timeout, StateData=#state{next_fsm=NextFSM,done=Done,
36
37
                               acc=Acc,fresh_input=Fresh,
37
38
                               qterm={reduce,FunTerm,Arg,_Acc},
38
                               coord=Coord,reduced=Reduced}) ->
39
                               coord=Coord,reduced=Reduced,
40
                               jsctx=JsCtx}) ->
39
41
    {Res,Red} = case Fresh of
40
42
        false ->
41
43
            {{next_state, wait, StateData#state{reduced=Reduced}},Reduced};
@@ -43,7 +45,8 @@ wait(timeout, StateData=#state{next_fsm=
43
45
            try
44
46
                NewReduced = case FunTerm of
45
47
                                 {qfun,F} -> F(Reduced,Arg);
46
                                 {modfun,M,F} -> M:F(Reduced,Arg)
48
                                 {modfun,M,F} -> M:F(Reduced,Arg);
49
                                 {jsfun, F} -> invoke_js(JsCtx, <<"riak_reducer">>, F, Reduced, Arg)
47
50
                             end,
48
51
                {{next_state, wait, StateData#state{reduced=NewReduced}},
49
52
                 NewReduced}
@@ -62,7 +65,7 @@ wait(timeout, StateData=#state{next_fsm=
62
65
        true ->
63
66
            case NextFSM of
64
67
                final -> nop;
65
                _ -> 
68
                _ ->
66
69
                    gen_fsm:send_event(NextFSM, {input, Red}),
67
70
                    gen_fsm:send_event(NextFSM, done)
68
71
            end,
@@ -95,9 +98,20 @@ handle_sync_event(_Event, _From, _StateN
95
98
handle_info(_Info, _StateName, StateData) ->
96
99
    {stop,badmsg,StateData}.
97
100
%% @private
98
terminate(Reason, _StateName, _State) ->
101
terminate(Reason, _StateName, State) ->
99
102
    riak_eventer:notify(riak_reduce_phase_fsm, phase_end, Reason),
103
    js_driver:destroy(State#state.jsctx),
100
104
    Reason.
105
101
106
%% @private
102
107
code_change(_OldVsn, StateName, State, _Extra) -> {ok, StateName, State}.
103
108
109
%% @private
110
invoke_js(JsCtx, FunName, FunSource, Reduced, Arg) ->
111
    js:define(JsCtx, list_to_binary(["var ", FunName, "=", FunSource])),
112
    case js:call(JsCtx, FunName, [Reduced, Arg]) of
113
        {ok, Result} ->
114
            Result;
115
        Error ->
116
            Error
117
    end.