Clojure client for Kue build on top of Carmine.
Compatible with Kue 0.6.x., tested with Kue 0.6.2..
- Fully-functional Kue jobs.
- Parallel workers for jobs processing.
You can install clj-kue using clojars repository.
With Leiningen:
[intervox/clj-kue "0.2.1"]With Maven:
<dependency>
<groupId>intervox</groupId>
<artifactId>clj-kue</artifactId>
<version>0.2.1</version>
</dependency>clj-kue is build on top of Carmine.
To create connection pool for your workers use connect! function from clj-kue.redis namespace:
(require '[clj-kue.redis :as redis])
(redis/connect!)You can also specify connection spec (e.g. host, port, db, timeout, uri) and connections pool options:
(def spec-opts {:db 1})
(def pool-opts {:max-total 20})
(redis/connect! spec-opts pool-opts)For the full list of avaliable connections pool options see GenericKeyedObjectPool documentation.
To alter connection spec or connections pool options you can use one of the following functions:
(redis/set-connection-pool! conn-pool)(redis/set-connection-spec! conn-spec)(redis/set-connection! {:pool conn-pool :spec conn-spec})If you're using Carmine prior to 2.0.0 in your project then you shoul consider using the previous version of clj-kue.
To create a worker, you use Worker function from clj-kue.worker namespace:
(use '[clj-kue.worker :only [Worker]])
(def worker (Worker "test" your-handler))
(.start worker)You can specify any of worker options during its creation:
(def worker (Worker "test" your-handler :maxidle 1000))Right now workers support the following options:
maxidlemaximum timeout to wait for a new job in seconds (defaultnil).0,nilor anyfalsevalue will result in an infinite timeout.
You can also use get and set methods to access and update workers options, including type and handler options:
(.set worker :maxidle 1000)
(.set worker :handler your-new-handler)Workers use refs to store their state allowing you to perform "hot" update of their options. For example, you can set a new jobs handler without stopping the worker. You can also set a new type of jobs to process, but it'll take effect only on the next processing step.
Each worker have step, start and stop methods.
step methods reads single job from Kue and processes it with the specified handler. If there is no jobs in the queue step waits for one with maximum timeout of maxidle seconds.
start methods starts invoking of the step method in an infinite loop in the separate thread.
stop method performs graceful shutdown of the worker. Calling stop will close the processing thread, but not before the next step of processing, because there is no way to close the thread while it's waiting for a new job. By default stop will block the execution untill the worker is stopped. But you can specify maximum timeout in miliseconds to wait. nil or negative value will result in infinite timeout. You can pass zero timeout if you don't want to wait at all. Consider that stop uses interuptions to shutdown threads after timeout and may interrupt processing of the job itself.
Job handler is an ordinary function taking the current job as its single argument.
Since workers ignore the output of the function, you shoud use job methods to pass processing results back to Kue. For the full list see IKueJob protocol.
Instead of using Worker function from clj-kue.worker namespace you can spawn workers with process function from clj-kue.core namespace. The differences are that process function allows you to spawn multiple workers, and that it starts each worker immidiately:
(use 'clj-kue.core)
; Starts 3 workers
(process "test" 3 your-handler)Workers automatically handles all exeptions in jobs handlers. So, if any exception is thrown in job handler, worker logs this exception and treat the job as failed.
To handle the following job
var kue = require('./index.js')
, jobs = kue.createQueue();
jobs.create('test', {
title: 'Sample job with progress and log'
, step: 1000
, ticks: 100
, log: 'Hello world'
}).save(function(){
process.exit();
});we spawn a single worker, reporting its progress and logging its results.
(process "test"
(fn [job]
(let [{:keys [step ticks log]} (.get job :data)]
(dotimes [i ticks]
(Thread/sleep step)
(.progress job i ticks))
(.log job log))))Instead of using clj.core/process function to spawn workers you can use helpers from clj-kue.helpers namespace.
Available helpers macros:
with-kue-jobwraps the execution of the jobkue-handlercreates a handler function with specified bindings, wrapped withwith-kue-jobdefhandleralso defines itkue-workerspawns single worker woth specifiedkue-handler
Advantages of wrapping your handler with with-kue-job:
- It catches any console output and sends it to
kueusing job'slogmetod. - If you're using clj-progress, it wraps it with special
progress-handlerand sends it tokueusing job'sprogressmetod. - Helpers are safe from reflections.
The previous example may be rewriten using clj-kue.helpers namespace:
(use '[clj-kue.helpers :only [kue-worker]])
(use 'clj-progress.core)
(kue-worker :test {:keys [step ticks log]}
(init ticks)
(dotimes [i ticks]
(Thread/sleep step)
(tick))
(println log)
(done))Or, if you want to spawn multiple workers:
(use 'clj-kue.core)
(use '[clj-kue.helpers :only [defhandler]])
(use 'clj-progress.core)
(defhandler my-handler {:keys [step ticks log]}
(init ticks)
(dotimes [i ticks]
(Thread/sleep step)
(tick))
(println log)
(done))
(process :test 5 my-handler)Copyright © 2013-2014 Leonid Beschastny
Distributed under the Eclipse Public License, the same as Clojure.