#+SETUPFILE: "basho-doc-style.iorg"
#+TITLE: Using Javascript with Riak Map/Reduce
Riak supports writing map/reduce query functions in Javascript, as
well as specifying query execution over HTTP. This document will
teach you how to use these features.
* Simple Example
This section hits the ground running with a quick example to
demonstrate what HTTP/Javascript map/reduce looks like in Riak.
This example will store several chunks of text in Riak, and then
compute a word counts on the set of documents.
** Load data
We will use the Riak HTTP interface to store the texts we want to
process:
#+BEGIN_EXAMPLE
$ curl -X PUT -H "content-type: text/plain" \
http://localhost:8098/riak/alice/p1 --data-binary @-
Alice was beginning to get very tired of sitting by her sister on the
bank, and of having nothing to do: once or twice she had peeped into the
book her sister was reading, but it had no pictures or conversations in
it, 'and what is the use of a book,' thought Alice 'without pictures or
conversation?'
^D
$ curl -X PUT -H "content-type: text/plain" \
http://localhost:8098/riak/alice/p2 --data-binary @-
So she was considering in her own mind (as well as she could, for the
hot day made her feel very sleepy and stupid), whether the pleasure
of making a daisy-chain would be worth the trouble of getting up and
picking the daisies, when suddenly a White Rabbit with pink eyes ran
close by her.
^D
$ curl -X PUT -H "content-type: text/plain" \
http://localhost:8098/riak/alice/p5 --data-binary @-
The rabbit-hole went straight on like a tunnel for some way, and then
dipped suddenly down, so suddenly that Alice had not a moment to think
about stopping herself before she found herself falling down a very deep
well.
#+END_EXAMPLE
** Run query
With data loaded, we can now run a query:
#+BEGIN_EXAMPLE
$ curl -X POST -H "content-type: application/json" http://localhost:8098/mapred --data @-
{"inputs":[["alice","p1"],["alice","p2"],["alice","p5"]],"query":[{"map":{"language":"javascript","source":"function(v) { var m = v.values[0].data.toLowerCase().match('\\\\w*','g'); var r = []; for(var i in m) if (m[i] != '') { var o = {}; o[m[i]] = 1; r.push(o); } return r; }"}},{"reduce":{"language":"javascript","source":"function(v) { var r = {}; for (var i in v) { for(var w in v[i]) { if (w in r) r[w] += v[i][w]; else r[w] = v[i][w]; } } return [r]; }"}}]}
^D
#+END_EXAMPLE
And we end up with the word counts for the three documents.
#+BEGIN_EXAMPLE
[{"the":8,"rabbit":2,"hole":1,"went":1,"straight":1,"on":2,"like":1,"a":6,"tunnel":1,"for":2,"some":1,"way":1,"and":5,"then":1,"dipped":1,"suddenly":3,"down":2,"so":2,"that":1,"alice":3,"had":3,"not":1,"moment":1,"to":3,"think":1,"about":1,"stopping":1,"herself":2,"before":1,"she":4,"found":1,"falling":1,"very":3,"deep":1,"well":2,"was":3,"considering":1,"in":2,"her":5,"own":1,"mind":1,"as":2,"could":1,"hot":1,"day":1,"made":1,"feel":1,"sleepy":1,"stupid":1,"whether":1,"pleasure":1,"of":5,"making":1,"daisy":1,"chain":1,"would":1,"be":1,"worth":1,"trouble":1,"getting":1,"up":1,"picking":1,"daisies":1,"when":1,"white":1,"with":1,"pink":1,"eyes":1,"ran":1,"close":1,"by":2,"beginning":1,"get":1,"tired":1,"sitting":1,"sister":2,"bank":1,"having":1,"nothing":1,"do":1,"once":1,"or":3,"twice":1,"peeped":1,"into":1,"book":2,"reading":1,"but":1,"it":2,"no":1,"pictures":2,"conversations":1,"what":1,"is":1,"use":1,"thought":1,"without":1,"conversation":1}]
#+END_EXAMPLE
** Explanation
For more details about what each bit of syntax means, and other
syntax options, read the following sections. As a quick
explanation of how this example map/reduce query worked, though:
1. The objects named =p1=, =p2=, and =p5= from the =alice= bucket
were given as inputs to the query.
2. The map function from the phase was run on each object. The
function:
#+BEGIN_SRC javascript
function(v) {
var m = v.values[0].data.match('\\w*','g');
var r = [];
for(var i in m)
if (m[i] != '') {
var o = {};
o[m[i]] = 1;
r.push(o);
}
return r;
}
#+END_SRC
creates a list of JSON objects, one for each word (non-unique)
in the text. The object has as a key, the word, and as the
value for that key, the integer 1.
3. The reduce function from the phase was run on the outputs of the
map functions. The function:
#+BEGIN_SRC javascript
function(v) {
var r = {};
for (var i in v) {
for(var w in v[i]) {
if (w in r)
r[w] += v[i][w];
else
r[w] = v[i][w];
}
}
return [r];
}
#+END_SRC
looks at each JSON object in the input list. It steps through
each key in each object, and produces a new object. That new
object has a key for each key in every other object, the value
of that key being the sum of the values of that key in the other
objects. It returns this new object in a list, because it may
be run a second time on a list including that object and more
inputs from the map phase.
4. The final output is a list with one element: a JSON object with
a key for each word in all of the documents (unique), with the
value of that key being the number of times the word appeared in
the documents.
* Query Syntax
Map/Reduce queries are issued over HTTP via a POST to the /mapred
resource. The body should be =application/json= of the form
={"inputs":[...inputs...],"query":[...query...]}=.
Map/Reduce queries have a default timeout of 60000 milliseconds
(60 seconds). The default timeout can be overridden by supplying
a different value, in milliseconds, in the JSON document
={"inputs":[...inputs...],"query":[...query...],"timeout": 90000}=
** Inputs
The list of input objects is given as a list of 2-element lists of
the form =[Bucket,Key]= or 3-element lists of the form
=[Bucket,Key,KeyData]=.
You may also pass just the name of a bucket
(={"inputs":"mybucket",...}=), which is equivalent to passing all
of the keys in that bucket as inputs (i.e. "a map/reduce across the
whole bucket"). You should be aware that this triggers the
somewhat expensive "list keys" operation, so you should use it
sparingly.
** Query
The query is given as a list of phases, each phase being of the
form ={PhaseType:{...spec...}}=. Valid =PhaseType= values are
"map", "reduce", and "link".
Every phase spec may include a =keep= field, which must have a
boolean value: =true= means that the results of this phase should
be included in the final result of the map/reduce, =false= means
the results of this phase should be used only by the next phase.
Omitting the =keep= field accepts its default value, which is
=false= for all phases except the final phase (Riak assumes that
you were most interested in the results of the last phase of your
map/reduce query).
*** Map
Map phases must be told where to find the code for the function to
execute, and what language that function is in.
Function source can be specified directly in the query by using
the "source" spec field. Function source can also be loaded from
a pre-stored riak object by providing "bucket" and "key" fields in
the spec.
For example:
:{"map":{"language":"javascript","source":"function(v) { return [v]; }","keep":true}}
would run the Javascript function given in the spec, and include
the results in the final output of the m/r query.
:{"map":{"language":"javascript","bucket":"myjs","key":"mymap","keep":false}}
would run the Javascript function declared in the content of the
Riak object under =mymap= in the =myjs= bucket, and the results of
the funciton would not be included in the final output of the m/r
query.
Map phases may also be passed static arguments by using the "arg"
spec field.
*** Reduce
Reduce phases look exactly like map phases, but are labeled "reduce".
*** Link
Link phases accept =bucket= and =tag= fields that specify which
links match the link query. The string "_" (underscore) in each
field means "match all", while any other string means "match
exactly this string". If either field is left out, it is
considered to be set to "_" (match all).
For example:
:{"link":{"bucket":"foo","keep":false}}
Would follow all links pointing to objects in the =foo= bucket,
regardless of their tag.
* Javascript Functions
** Function Parameters
*** Map functions
Map functions are passed three parameters: the object that the map
is being applied to, the "keydata" for that object, and the static
argument for the phase.
The object will be a JSON object of the form:
#+BEGIN_EXAMPLE
{
"bucket":BucketAsString,
"key":KeyAsString,
"vclock":VclockAsString,
"values":[
{
"metadata":{
"X-Riak-VTag":VtagAsString,
"X-riak-Last-Modified":LastModAsString,
...other metadata...
},
"data":ObjectData
},
...other metadata/data values (siblings)...
]
}
#+END_EXAMPLE
=object.values[0].data= is probably what you will be interested in
most of the time, but the rest of the details of the object are
provided for your use.
The "keydata" is the third element of the item from the input
bucket/key list (called =KeyData= in the [[Inputs]] section above), or
"undefined" if none was provided.
The static argument for the phase is the value of the =arg= field
from the map spec in the query list.
A map phase should produce a list of results. You will see errors
if the output of your map function is not a list. Return the
empty list if your map function chooses not to produce output.
*** Reduce functions
Reduce functions are passed two parameters: a list of inputs to
reduce, and the static argument for the phase.
The list of inputs to reduce may contain values from previous
executions of the reduce function. It will also contain results
produced by the preceding map or reduce phase.
The static argument for the phase is the value of the =arg= field
from the reduce spec in the query list.
A reduce phase should produce a list of results. You will see
errors if the output of your reduce function is not a list. The
function should return an empty list, if it has no other output to
produce.
*** Link functions
If you are storing data through the HTTP interface, and using the
=Link= HTTP header, you do not need to worry about writing a
link-extraction function. Just use the predefined
=raw_link_walker_resource:mapreduce_linkfun/3=.
But, if you need to extract links from your data in some other
manner, there are many ways to specify Javascript functions to do
that. They all start with setting the =linkfun= bucket property.
Through the HTTP interface:
:$ curl -X PUT -H "application/json" http://localhost:8098/riak/bucket \
:> --data "{\"props\":{\"linkfun\":{...function...}}}"
The three ways to fill in the value of the =linkfun= key are:
+ Quoted source code, as the value of the =jsanon= key:
:{"jsanon":"function(v,kd,bt) { return []; }"}
+ The bucket and key of an object containing the function source:
:{"jsanon":{"bucket":Bucket,"key":Key}}
+ The name of a predefined Javascript function:
:{"jsfun":FunctionName}
The function has basically the same contract as a map function.
The first argument is the object from which links should be
extracted. The second argument is the =KeyData= for the object.
The third argument is a Javascript object representing the links
to match at return. The two fields in the object, =bucket= and
=tag=, will have the values given in the link phase spec from the
query.
The link fun should return a list of the same form as the =inputs=
list: 2-item bucket/key lists, or 3-item bucket/key/keydata lists.
* How Map/Reduce Queries Work
** Map/Reduce Intro
The main goal of Map/Reduce is to spread the processing of a query
across many systems to take advantage of parallel processing power.
This is generally done by dividing the query into several steps,
dividing the dataset into several chunks, and then running those
step/chunk pairs in separate physical hosts.
One step type is called "map". Map functions take one piece of
data as input, and produce zero or more results as output. If
you're familiar with "mapping over a list" in functional
programming style, you're already familiar with "map" steps in a
map/reduce query.
Another step type is called "reduce". The purpose of a "reduce"
step is to combine the output of many "map" step evaluations, into
one result.
The common example of a map/reduce query involves a "map" step that
takes a body of text as input, and produces a word count for that
body of text. A reduce step then takes the word counts produced
from many bodies of text and either sums them to provide a word
count for the corpus, or filters them to produce a list of
documents containing only certain counts.
** Riak-specific Map/Reduce
*** How Riak Spreads Processing
Riak's map/reduce has an additional goal: increasing data-locality.
When processing a large dataset, it's often much more efficient to
take the computation to the data than it is to bring the data to
the computation.
It is Riak's solution to the data-locality problem that determines
how Riak spreads the processing across the cluster. In the same
way that any Riak node can coordinate a read or write by sending
requests directly to the other nodes responsible for maintaining
that data, any Riak node can also coordinate a map/reduce query by
sending a map-step evaluation request directly to the node
responsible for maintaining the input data. Map-step results are
sent back to the coordinating node, where reduce-step processing
can produce a unified result.
Put more simply: Riak runs map-step functions right on the node
holding the input data for those functions, and it runs reduce-step
functions on the node coordinating the map/reduce query.
*** How Riak's Map/Reduce Queries Are Specified
Map/Reduce queries in Riak have two components: a list of inputs
and a list of "steps", or "phases".
Each element of the input list is a bucket-key pair. This
bucket-key pair may also be annotated with "key-data", which will
be passed as an argument to a map function, when evaluated on the
object stored under that bucket-key pair.
Each element of the phases list is a description of a map
function, a reduce function, or a link function. The description
includes where to find the code for the phase function (for map
and reduce phases), static data passed to the function every time
it is executed during that phase, and a flag indicating whether or
not to include the results of that phase in the final output of
the query.
The phase list describes the chain of operations each input will
flow through. That is, the initial inputs will be fed to the
first phase in the list, and the output of that phase will be fed
as input to the next phase in the list. This stream will continue
through the final phase.
*** How a Map Phase Works in Riak
The input list to a map phase must be a list of (possibly
annotated) bucket-key pairs. For each pair, Riak will send the
request to evaluate the map function to the partition that is
responsible for storing the data for that bucket-key. The vnode
hosting that partition will lookup the object stored under that
bucket-key, and evaluate the map function with the object as an
argument. The other arguments to the function will be the
annotation, if any is included, with the bucket-key, and the
static data for the phase, as specified in the query.
*** How a Reduce Phase Works in Riak
Reduce phases accept any list of data as input, and produce any
list of data as output. They also receive a phase-static value,
specified in the query definition.
The important thing to understand is that the function defining
the reduce phase may be evaluated multiple times, and the input of
later evaluations will include the input of earlier evaluations.
For example, a reduce phase may implement the "set-union"
function. In that case, the first set of inputs might be
=[1,2,2,3]=, and the output would be =[1,2,3]=. When the phase
receives more inputs, say =[3,4,5]=, the function will be called
with the concatentation of the two lists: =[1,2,3,3,4,5]=.
Other systems refer to the second application of the reduce
function as a "re-reduce". There are at least a couple of
reduce-query implementation strategies that work with Riak's model.
One strategy is to implement the phase preceeding the reduce
phase, such that its output is "the same shape" as the output of
the reduce phase. This is how the examples in this document are
written, and the way that we have found produces cleaner code.
An alternate strategy is to make the output of a reduce phase
recognizable, such that it can be extracted from the input list on
subsequent applications. For example, if inputs from the
preceeding phase are numbers, outputs from the reduce phase could
be objects or strings. This would allow the function to find the
previous result, and apply new inputs to it.
*** How a Link Phase Works in Riak
Link phases find links matching patterns specified in the query
definition. The patterns specify which buckets and tags links
must have.
"Following a link" means adding it to the output list of this
phase. The output of this phase is often most useful as input to
a map phase, or another reduce phase.
*** Using Named Functions
Riak can also use pre-defined named functions for map and reduce
phase processing. Named functions are invoked with the following
form:
#+BEGIN_EXAMPLE
{"map": {"language": "javascript", "name": "Riak.mapValues", "keep": true}}
{"reduce": {"language": "javascript", "name": "Riak.reduceSort", "keep": true}}
#+END_EXAMPLE
The key =name= in both examples points to the name of the function
to be used. Riak expects the function to be defined prior to the
execution of the phase using it.
**** Defining Named Functions
Defining a named function for Riak is a simple process.
1. Create a Javascript source file containing the definitions for
all the functions you would like Riak to pre-define.
2. Edit the =app.config= of your Riak nodes and add the line
={js_source_dir, <path_to_source_dir>}= to the =riak=
configuration block. =<path_to_source_dir>= should point to
the directory where the file created in step #1 was saved.
3. Start using the functions in your map/reduce jobs.
When =js_source_dir= is enabled, Riak scans the directory for
files ending in =.js=. These files are then loaded into each
Javascript VM when it is created.
NOTE: Named functions must be available on all nodes in a cluster
for proper map/reduce results.
**** Why Use Named Functions?
Named functions can be better than anonymous functions in certain
situations. Since named functions live in a file they can be
managed using source code control and deployed automatically
using tools such as Chef or Puppet. This can be a significant
advantage when administrating large Riak clusters.
More important, though, is the fact that named functions execute
much faster than the equivalent anonymous functions. Invoking
anonymous functions requires Riak to ensure the anonymous
function is defined before invoking it. Named functions allow
Riak to skip the definition check and execute the function call
immediately.
Also, since named functions do not change between invocations,
Riak is able to cache named function call results and short
circuit the call entirely. Currently, Riak performs this
optimization on named functions executed during map phases only.
In general, anonymous functions should be used during development
and named functions should be used for production deployments
where possible. This combination provides the maximum flexibility
and performance.
**** Riak-Supplied Functions
Riak supplies several named functions out of the box. These
functions are defined on a global Javascript object named =Riak=
and should not be modified or overridden. These functions, along
with descriptions and notes on their use are described in the
next two sections.
***** Named Map Functions
+ =Riak.mapValues(values, keyData, arg)=
Extracts and returns only the values contained in a bucket and key.
+ =Riak.mapValuesJson(values, keyData, arg)=
Same as =mapValues= except the values are passed through a JSON
decoder first.
***** Named Reduce Functions
+ =Riak.reduceSum(values, arg)=
Returns the sum of =values=
+ =Riak.reduceMin(values, arg)=
Returns the minimum value from =values=
+ =Riak.reduceMax(values, arg)=
Returns the maximum value from =values=
+ =Riak.reduceSort(values, arg)=
Returns the sorted version of =values=. If =arg= is the source
to a Javascript function, it will be eval'd and used to
control the sort via =Array.sort=.
+ =Riak.reduceLimit(values, arg)=
Returns the leftmost n members of values where =arg= is used as n.
+ =Riak.reduceSlice(values, arg)=
Returns a slice of the values array. =arg= must be a two
element array containing the starting and ending positions for
the slice.