LAgent: an agent framework in F# – Part VIII - Implementing MapReduce (user model) - Luca Bolognese

LAgent: an agent framework in F# – Part VIII - Implementing MapReduce (user model)

Luca -

☕ 3 min. read

Download frame­work here.

All posts are here:

For this post I use a newer ver­sion of the frame­work that I just up­loaded on CodeGallery. In the process of us­ing LAgent I grew more and more un­happy with the weakly typed way of send­ing mes­sages. The code that im­ple­ments that fea­ture is nasty: full of up­casts and down­casts. I was los­ing faith in it. Bugs were crop­ping up in all sorts of sce­nar­ios (i.e. us­ing generic union types as mes­sages).

In the end I de­cided to re-ar­chi­tec­ture the frame­work so to make it strongly typed. In essence now each agent can just re­ceive mes­sages of a sin­gle type. The lim­i­ta­tions that this de­sign choice in­tro­duces (i.e. more lim­ited hot swap­ping) are com­pen­sated by the catch­ing of er­rors at com­pile time and the stream­lin­ing of the code. I left the old frame­work on the site in case you dis­agree with me.

In any case, to­day’s post is about MapReduce. It as­sumes that you know what it is (link to the orig­i­nal Google pa­per that served as in­spi­ra­tion is here: Google Research Publication- MapReduce). What would it take to im­ple­ment an in-mem­ory MapReduce us­ing my agent frame­work?

Let’s start with the user model.

let mapReduce   (inputs:seq<'in_key * 'in_value>)
                (map:'in_key -> 'in_value -> seq<'out_key * 'out_value>)
                (reduce:'out_key -> seq<'out_value> -> seq<'reducedValues>)
                outputAgent
                M R partitionF =                

mapRe­duce takes seven pa­ra­me­ters:

  1. in­puts: a se­quence of in­put key/​value pairs.
  2. map: this func­tion op­er­ates on each in­put key/​value pair. It  re­turns a se­quence of out­put key/​value pairs. The type of the out­put se­quence can be dif­fer­ent from the type of the in­puts.
  3. re­duce: this func­tion op­er­ates on an out­put key and all the val­ues as­so­ci­ated with it. It re­turns a se­quence of re­duced val­ues (i.e. the av­er­age of all the val­ues for this key)
  4. ouputA­gent: this is the agent that gets no­ti­fied every time a new out­put key has been re­duced and at the end when all the op­er­a­tion ends.
  5. M: how many map­per agents to in­stan­ti­ate
  6. R: how many re­ducer agents to in­stan­ti­ate
  7. par­ti­tionF: the par­ti­tion func­tion used to choose which of the re­duc­ers is as­so­ci­ated with a key

Let’s look at how to use this func­tion to find how of­ten each word is used in a set of files. First a sim­ple par­ti­tion func­tion can be de­fined as:

let partitionF = fun key M -> abs(key.GetHashCode()) % M 

Given a key and some buck­ets, it picks one of the buck­ets. Its type is: a –> int –> int, so it’s fairly reusable.

Let’s also cre­ate a ba­sic agent that just prints out the re­duced val­ues:

let printer = spawnWorker (fun msg ->
                            match msg with
                            | Reduced(key, value)   -> printfn "%A %A" key value
                            | MapReduceDone         -> printfn "All done!!")

The agent gets no­ti­fied when­ever a new key is re­duced or the al­go­rithm ends. It is use­ful to be no­ti­fied im­me­di­ately in­stead of wait­ing for every­thing to be done. If I had­n’t writ­ten this code us­ing agents I would have not re­al­ized that pos­si­bil­ity. I would sim­ply have framed the prob­lem as a func­tion that takes an in­put and re­turns an out­put. Agents force you to think ex­plic­itly about the par­al­lelism in your app. That’s a good thing.

The map­ping func­tion sim­ply split the con­tent of a file into words and adds a word/​1 pair to the list. I know that there are much bet­ter ways to do this (i.e. reg­u­lar ex­pres­sions for the pars­ing and sum­ming words counts in­side the func­tion), but I wanted to test the ba­sic frame­work ca­pa­bil­i­ties and do­ing it this way does it bet­ter.

let map = fun (fileName:string) (fileContent:string) ->
            let l = new List<string * int>()
            let wordDelims = [|' ';',';';';'.';':';'?';'!';'(';')';'n';'t';'f';'r';'b'|]
            fileContent.Split(wordDelims) |> Seq.iter (fun word -> l.Add((word, 1)))
            l :> seq<string * int>

The re­ducer func­tion sim­ply sums the var­i­ous word sta­tis­tics sent by the map­pers:

let reduce = fun key (values:seq<int>) -> [values |> Seq.sum] |> seq<int>

Now we can cre­ate some fake in­put to check that it works:

let testInput = ["File1", "I was going to the airport when I saw someone crossing";
"File2", "I was going home when I saw you coming toward me"]

And ex­e­cute the mapRe­duce:

mapReduce testInput map reduce printer 2 2 partitionF

On my ma­chine I get the fol­low­ing. You might get a dif­fer­ent or­der be­cause of the async/​par­al­lel pro­cess­ing in­volved. If I wanted a sta­ble or­der I would need to change the printer agent to cache re­sults on Reduced and process them on MapReduceDone (see next post).

I” [4]

crossing” [1]

going” [2]

home” [1]

me” [1]

the” [1]

toward” [1]

airport” [1]

coming” [1]

saw” [2]

someone” [1]

to” [1]

was” [2]

when” [2]

you” [1]

In the next post we’ll process some real books …

4 Comments

Comments

Gary Davidson

2009-09-08T23:24:59Z

I cannot compile in VS 2008 or VS2010
VS2010 say build failed because of the method FromContinuations
in AgentSystem.fs
type AsyncResultCell<'T>() =
       let source = new TaskCompletionSource<'T>()
       member this.RegisterResult r = source.SetResult(r)
       member this.AsyncWaitResult =
           Async.FromContinuations(fun (cont,_,_) ->
               let y = fun (t:Task<'T>) -> cont (t.Result)
               source.Task.ContinueWith(y) |> ignore)

Oh sorry,
it was called Primitive in VS10 B1. I'm using my dev machine to code this, which has B2.
Just change the code and it should work (or wait for B2 to show up :-) )  

Gary Davidson

2009-09-11T19:51:09Z

Still having compile issues, sigh!!! So who do you have to sleep with to get b2? lol

If you replace it with Primitive you get:
Error1The method 'ContinueWith' is overloaded. Possible matches are shown below (or in the Error List window)D:AgentsAgentSystem.fs16932Agents
Error2  Possible overload: 'Task.ContinueWith(continuationAction: Action<Task<'T>>) : Task'.D:AgentsAgentSystem.fs16932Agents
Error3  Possible overload: 'Task.ContinueWith<'TNewResult>(continuationFunction: Func<Task<'T>,'TNewResult>) : Task<'TNewResult>'.D:AgentsAgentSystem.fs16932Agents
Error4  Possible overload: 'Task.ContinueWith<'TNewResult>(continuationFunction: Func<Task<'T>,Task<'TNewResult>>) : Task<'TNewResult>'.D:AgentsAgentSystem.fs16932Agents
Error5  Possible overload: 'Task.ContinueWith(continuationAction: Action<Task>) : Task'.D:AgentsAgentSystem.fs16932Agents
Error6  Possible overload: 'Task.ContinueWith<'TResult>(continuationFunction: Func<Task,'TResult>) : Task<'TResult>'.D:AgentsAgentSystem.fs16932Agents
Error7  Possible overload: 'Task.ContinueWith<'TResult>(continuationFunction: Func<Task,Task<'TResult>>) : Task<'TResult>'.D:AgentsAgentSystem.fs16932Agents
Error8  Possible overload: 'Task.ContinueWith(continuationAction: Action<Task<'T>>, scheduler: TaskScheduler) : Task'.D:AgentsAgentSystem.fs16932Agents
Error9  Possible overload: 'Task.ContinueWith(continuationAction: Action<Task<'T>>, continuationOptions: TaskContinuationOptions) : Task'.D:AgentsAgentSystem.fs16932Agents
Error10  Possible overload: 'Task.ContinueWith<'TNewResult>(continuationFunction: Func<Task<'T>,'TNewResult>, scheduler: TaskScheduler) : Task<'TNewResult>'.D:AgentsAgentSystem.fs16932Agents
Error11  Possible overload: 'Task.ContinueWith<'TNewResult>(continuationFunction: Func<Task<'T>,'TNewResult>, continuationOptions: TaskContinuationOptions) : Task<'TNewResult>'.D:AgentsAgentSystem.fs16932Agents
Error12  Possible overload: 'Task.ContinueWith<'TNewResult>(continuationFunction: Func<Task<'T>,Task<'TNewResult>>, scheduler: TaskScheduler) : Task<'TNewResult>'.D:AgentsAgentSystem.fs16932Agents
Error13  Possible overload: 'Task.ContinueWith<'TNewResult>(continuationFunction: Func<Task<'T>,Task<'TNewResult>>, continuationOptions: TaskContinuationOptions) : Task<'TNewResult>'.D:AgentsAgentSystem.fs16932Agents
Error14  Possible overload: 'Task.ContinueWith(continuationAction: Action<Task>, scheduler: TaskScheduler) : Task'.D:AgentsAgentSystem.fs16932Agents
Error15  Possible overload: 'Task.ContinueWith(continuationAction: Action<Task>, continuationOptions: TaskContinuationOptions) : Task'.D:AgentsAgentSystem.fs16932Agents
Error16  Possible overload: 'Task.ContinueWith<'TResult>(continuationFunction: Func<Task,'TResult>, scheduler: TaskScheduler) : Task<'TResult>'.D:AgentsAgentSystem.fs16932Agents
Error17  Possible overload: 'Task.ContinueWith<'TResult>(continuationFunction: Func<Task,'TResult>, continuationOptions: TaskContinuationOptions) : Task<'TResult>'.D:AgentsAgentSystem.fs16932Agents
Error18  Possible overload: 'Task.ContinueWith<'TResult>(continuationFunction: Func<Task,Task<'TResult>>, scheduler: TaskScheduler) : Task<'TResult>'.D:AgentsAgentSystem.fs16932Agents
Error19  Possible overload: 'Task.ContinueWith<'TResult>(continuationFunction: Func<Task,Task<'TResult>>, continuationOptions: TaskContinuationOptions) : Task<'TResult>'.D:AgentsAgentSystem.fs16932Agents
Error20  Possible overload: 'Task.ContinueWith(continuationAction: Action<Task<'T>>, continuationOptions: TaskContinuationOptions, scheduler: TaskScheduler) : Task'.D:AgentsAgentSystem.fs16932Agents
Error21  Possible overload: 'Task.ContinueWith<'TNewResult>(continuationFunction: Func<Task<'T>,'TNewResult>, continuationOptions: TaskContinuationOptions, scheduler: TaskScheduler) : Task<'TNewResult>'.D:AgentsAgentSystem.fs16932Agents
Error22  Possible overload: 'Task.ContinueWith<'TNewResult>(continuationFunction: Func<Task<'T>,Task<'TNewResult>>, continuationOptions: TaskContinuationOptions, scheduler: TaskScheduler) : Task<'TNewResult>'.D:AgentsAgentSystem.fs16932Agents
Error23  Possible overload: 'Task.ContinueWith(continuationAction: Action<Task>, continuationOptions: TaskContinuationOptions, scheduler: TaskScheduler) : Task'.D:AgentsAgentSystem.fs16932Agents
Error24  Possible overload: 'Task.ContinueWith<'TResult>(continuationFunction: Func<Task,'TResult>, continuationOptions: TaskContinuationOptions, scheduler: TaskScheduler) : Task<'TResult>'.D:AgentsAgentSystem.fs16932Agents
Error25  Possible overload: 'Task.ContinueWith<'TResult>(continuationFunction: Func<Task,Task<'TResult>>, continuationOptions: TaskContinuationOptions, scheduler: TaskScheduler) : Task<'TResult>'.D:AgentsAgentSystem.fs16932Agents

0 Webmentions

These are webmentions via the IndieWeb and webmention.io.