-
Notifications
You must be signed in to change notification settings - Fork 3
Passive queue
This example illustrates the combination of a producer
queue and a passive
queue. The producer
queue is configured with 3 workers picking jobs from the passive queue as quickly as they can. This pattern is quite similar to a worker pool of gen_tcp acceptors.
Fun = fun() -> io:fwrite("~p starting...~n",[self()]), Res = jobs:dequeue(q, 3), io:fwrite("Res = ~p~n", [Res]) end, application:set_env(jobs, queues, [{pool, producer, Fun, [{regulators, [ {counter,[{limit,3}]} ]} ]}, {q, passive,[]}]).
Here is a shell printout. The setup creates a pool of size 3, where the producer processes call jobs:dequeue(q, N). This function blocks if the queue is empty. When I put something in the queue, it is immediately forwarded to one of the waiting processes in the pool, which completes, terminates, letting the jobs_server spawn a new worker.
If it is important to reuse worker processes, this could be done simply by letting the producer fun loop and repeatedly call dequeue/2. If it dies, a new worker is started, to ensure the pool is filled.
You can only enqueue/dequeue to a passive queue. Any other type of queue will cause a badarg exception in the client.
=PROGRESS REPORT==== 13-Jan-2011::10:34:32 === application: sasl started_at: nonode@nohost Eshell V5.8.1 (abort with ^G) 1> application:set_env(jobs, queues, [{pool, producer, fun() -> io:fwrite("~p starting...~n",[self()]), Res = jobs:dequeue(q, 3), io:fwrite("Res = ~p~n", [Res]) end, [{regulators, [{counter,[{limit,3}]}]}]}, {q, passive,[]}]). ok 2> application:start(jobs). =PROGRESS REPORT==== 13-Jan-2011::10:34:44 === supervisor: {local,jobs_app} started: [{pid,<0.47.0>}, {name,jobs_server}, {mfargs,{jobs_server,start_link,[]}}, {restart_type,permanent}, {shutdown,3000}, {child_type,worker}] <0.48.0> starting... <0.49.0> starting... <0.50.0> starting... ok 3> =PROGRESS REPORT==== 13-Jan-2011::10:34:44 === supervisor: {local,kernel_safe_sup} started: [{pid,<0.52.0>}, {name,timer_server}, {mfargs,{timer,start_link,[]}}, {restart_type,permanent}, {shutdown,1000}, {child_type,worker}] =PROGRESS REPORT==== 13-Jan-2011::10:34:44 === supervisor: {local,jobs_app} started: [{pid,<0.51.0>}, {name,jobs_sampler}, {mfargs,{jobs_sampler,start_link,[]}}, {restart_type,permanent}, {shutdown,3000}, {child_type,worker}] =PROGRESS REPORT==== 13-Jan-2011::10:34:44 === application: jobs started_at: nonode@nohost 3> jobs:enqueue(q, job1). ok Res = [{36911304349436,job1}] 4> <0.55.0> starting... 4> jobs:enqueue(q, job2). ok Res = [{36911308613434,job2}] 5> <0.58.0> starting...