Skip to content

Add protocol for iterating keys in a store #29

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Nov 19, 2019
2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(defproject io.replikativ/konserve "0.5.1"
(defproject io.replikativ/konserve "0.6.0-SNAPSHOT"
:description "Durable cross-platform key-value store protocol with core.async."
:url "http://github.com/replikativ/konserve"
:license {:name "Eclipse Public License"
Expand Down
9 changes: 7 additions & 2 deletions src/konserve/core.cljc
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
(ns konserve.core
(:refer-clojure :exclude [get get-in update update-in assoc assoc-in exists? dissoc])
(:refer-clojure :exclude [get get-in update update-in assoc assoc-in exists? dissoc keys])
(:require [konserve.protocols :refer [-exists? -get-in -assoc-in
-update-in -dissoc -bget -bassoc]]
-update-in -dissoc -bget -bassoc
-keys]]
[hasch.core :refer [uuid]]
#?(:clj [clojure.core.async :refer [chan poll! put! <! go]]
:cljs [cljs.core.async :refer [chan poll! put! <!]]))
Expand Down Expand Up @@ -203,6 +204,10 @@
(<! (-bassoc store key val))))


(defn keys
"Return a channel that will yield all top-level keys currently in the store."
([store] (-keys store)))


(comment
(require '[clojure.core.async :refer [<!! chan go] :as async])
Expand Down
36 changes: 24 additions & 12 deletions src/konserve/filestore.clj
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
[konserve.protocols :refer [PEDNAsyncKeyValueStore
-exists? -get-in -update-in -dissoc -assoc-in
PBinaryAsyncKeyValueStore -bget -bassoc
-serialize -deserialize]])
-serialize -deserialize
PKeyIterable -keys]])
(:import [java.io
DataInputStream DataOutputStream
FileInputStream FileOutputStream
Expand Down Expand Up @@ -62,16 +63,18 @@
(let [f (io/file (str folder "/meta/" fn))
fd (io/file (str folder "/data/" fn))]
(if (and (.exists f) (.exists fd))
(let [fis (DataInputStream. (FileInputStream. f))]
(try
(-deserialize serializer read-handlers fis)
(catch Exception e
(ex-info "Could not read key."
{:type :read-error
:key fn
:exception e}))
(finally
(.close fis))))
(async/<!
(async/thread
(let [fis (DataInputStream. (FileInputStream. f))]
(try
(-deserialize serializer read-handlers fis)
(catch Exception e
(ex-info "Could not read key."
{:type :read-error
:key fn
:exception e}))
(finally
(.close fis))))))
(println "Stale key file detected: " fn))))))
async/merge
(async/into #{}))]
Expand Down Expand Up @@ -386,7 +389,16 @@
key-folder (str folder "/meta/")]
(async/thread
(do (write-edn-key serializer write-handlers read-handlers key-folder file-name {:key key :format :binary} config)
(write-binary folder (str file-name) key input config))))))
(write-binary folder (str file-name) key input config)))))

PKeyIterable
(-keys [this]
(let [ch (async/chan)]
(async/take!
(list-keys this)
(fn [ks]
(async/onto-chan ch (map :key ks))))
ch)))

(defmethod print-method FileSystemStore
[store writer]
Expand Down
11 changes: 9 additions & 2 deletions src/konserve/memory.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
(:require #?(:clj [clojure.core.async :refer [go]])
[konserve.protocols :refer [PEDNAsyncKeyValueStore
PBinaryAsyncKeyValueStore
-update-in]])
-update-in
PKeyIterable
-keys]]
[clojure.core.async :as async])
#?(:cljs (:require-macros [cljs.core.async.macros :refer [go]])))

(defrecord MemoryStore [state read-handlers write-handlers locks]
Expand All @@ -22,7 +25,11 @@
(-bassoc [this key input]
(go (swap! state assoc key {:input-stream input
:size :unknown})
nil)))
nil))

PKeyIterable
(-keys [_]
(async/to-chan (keys @state))))

#?(:clj
(defmethod print-method MemoryStore
Expand Down
6 changes: 6 additions & 0 deletions src/konserve/protocols.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,9 @@
"Allows binary data byte array storage."
(-bget [this key locked-cb] "Calls locked-cb with a platform specific binary representation inside the lock, e.g. wrapped InputStream on the JVM and Blob in JavaScript. You need to properly close/dispose the object when you are done!")
(-bassoc [this key val] "Copies given value (InputStream, Reader, File, byte[] or String on JVM, Blob in JavaScript) under key in the store."))


(defprotocol PKeyIterable
"Allows lazy iteration of keys in this store."
(-keys [this]
"Return a channel that will continuously yield keys in this store."))
9 changes: 6 additions & 3 deletions test/konserve/core_test.clj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
(ns konserve.core-test
(:refer-clojure :exclude [get get-in update update-in assoc assoc-in dissoc exists?])
(:refer-clojure :exclude [get get-in update update-in assoc assoc-in dissoc exists? keys])
(:require [clojure.test :refer :all]
[clojure.core.async :refer [<!! go]]
[clojure.core.async :refer [<!! go] :as async]
[konserve.core :refer :all]
[konserve.memory :refer [new-mem-store]]
[konserve.filestore :refer [new-fs-store delete-store list-keys]]
Expand Down Expand Up @@ -38,7 +38,9 @@
(<!! (bget store :binbar (fn [{:keys [input-stream]}]
(go
(is (= (map byte (slurp input-stream))
(range 10))))))))))
(range 10)))))))
(is (= #{:baz :binbar}
(<!! (async/into #{} (keys store))))))))

(deftest append-store-test
(testing "Test the append store functionality."
Expand Down Expand Up @@ -78,6 +80,7 @@
(is (= (<!! (list-keys store))
#{{:key :binbar, :format :binary}}))
(is (= @binbar (range 10)))
(is (= [:binbar] (<!! (async/into [] (keys store)))))
(delete-store folder)
(let [store (<!! (new-fs-store folder))]
(is (= (<!! (list-keys store))
Expand Down
2 changes: 1 addition & 1 deletion test/konserve/filestore_migration_test.clj
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
(ns konserve.filestore-migration-test
(:refer-clojure :exclude [get get-in update update-in assoc assoc-in dissoc exists? bget bassoc])
(:refer-clojure :exclude [get get-in update update-in assoc assoc-in dissoc exists? bget bassoc keys])
(:require [clojure.test :refer :all]
[konserve.old-filestore :as old-store]
[clojure.core.async :refer [<!! >!! chan go]]
Expand Down
2 changes: 1 addition & 1 deletion test/konserve/serializers_test.clj
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
(ns konserve.serializers-test
(:refer-clojure :exclude [get get-in update update-in assoc assoc-in dissoc exists?])
(:refer-clojure :exclude [get get-in update update-in assoc assoc-in dissoc exists? keys])
(:require [clojure.test :refer :all]
[clojure.core.async :refer [<!!]]
[konserve.core :refer :all]
Expand Down