diff options
author | Alex <alex@adnab.me> | 2024-01-11 10:52:12 +0000 |
---|---|---|
committer | Alex <alex@adnab.me> | 2024-01-11 10:52:12 +0000 |
commit | 723e56b37f13f078a15e067343191fb1bf96e8b2 (patch) | |
tree | 8188e7e8bc41867ce2de61cd7df6554e05678d11 /script/jepsen.garage/src | |
parent | a8b0e01f88b947bc34c05d818d51860b4d171967 (diff) | |
parent | fa9247f11b89c960dffe82d6bf990ed4335788e3 (diff) | |
download | garage-723e56b37f13f078a15e067343191fb1bf96e8b2.tar.gz garage-723e56b37f13f078a15e067343191fb1bf96e8b2.zip |
Merge pull request 'Jepsen testing (NLnet task 3 subtask 1)' (#544) from jepsen into main
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/544
Diffstat (limited to 'script/jepsen.garage/src')
-rw-r--r-- | script/jepsen.garage/src/jepsen/garage.clj | 105 | ||||
-rw-r--r-- | script/jepsen.garage/src/jepsen/garage/daemon.clj | 152 | ||||
-rw-r--r-- | script/jepsen.garage/src/jepsen/garage/nemesis.clj | 142 | ||||
-rw-r--r-- | script/jepsen.garage/src/jepsen/garage/reg.clj | 143 | ||||
-rw-r--r-- | script/jepsen.garage/src/jepsen/garage/s3api.clj | 48 | ||||
-rw-r--r-- | script/jepsen.garage/src/jepsen/garage/set.clj | 135 |
6 files changed, 725 insertions, 0 deletions
diff --git a/script/jepsen.garage/src/jepsen/garage.clj b/script/jepsen.garage/src/jepsen/garage.clj new file mode 100644 index 00000000..446b81de --- /dev/null +++ b/script/jepsen.garage/src/jepsen/garage.clj @@ -0,0 +1,105 @@ +(ns jepsen.garage + (:require + [clojure.string :as str] + [jepsen + [checker :as checker] + [cli :as cli] + [generator :as gen] + [nemesis :as nemesis] + [tests :as tests]] + [jepsen.os.debian :as debian] + [jepsen.garage + [daemon :as grg] + [nemesis :as grgNemesis] + [reg :as reg] + [set :as set]])) + +(def workloads + "A map of workload names to functions that construct workloads, given opts." + {"reg1" reg/workload1 + "reg2" reg/workload2 + "set1" set/workload1 + "set2" set/workload2}) + +(def scenari + "A map of scenari to the associated nemesis" + {"c" grgNemesis/scenario-c + "cp" grgNemesis/scenario-cp + "r" grgNemesis/scenario-r + "pr" grgNemesis/scenario-pr + "cpr" grgNemesis/scenario-cpr + "cdp" grgNemesis/scenario-cdp + "dpr" grgNemesis/scenario-dpr}) + +(def patches + "A map of patch names to Garage builds" + {"default" "v0.9.0" + "tsfix1" "d146cdd5b66ca1d3ed65ce93ca42c6db22defc09" + "tsfix2" "c82d91c6bccf307186332b6c5c6fc0b128b1b2b1" + "task3a" "707442f5de416fdbed4681a33b739f0a787b7834" + "task3b" "431b28e0cfdc9cac6c649193cf602108a8b02997" + "task3c" "0041b013a473e3ae72f50209d8f79db75a72848b"}) + +(def cli-opts + "Additional command line options." + [["-p" "--patch NAME" "Garage patch to use" + :default "default" + :validate [patches (cli/one-of patches)]] + ["-s" "--scenario NAME" "Nemesis scenario to run" + :default "cp" + :validate [scenari (cli/one-of scenari)]] + ["-r" "--rate HZ" "Approximate number of requests per second, per thread." + :default 10 + :parse-fn read-string + :validate [#(and (number? %) (pos? %)) "Must be a positive number"]] + [nil "--ops-per-key NUM" "Maximum number of operations on any given key." + :default 100 + :parse-fn parse-long + :validate [pos? "Must be a positive integer."]] + ["-w" "--workload NAME" "Workload of test to run" + :default "reg1" + :validate [workloads (cli/one-of workloads)]]]) + +(defn garage-test + "Given an options map from the command line runner (e.g. :nodes, :ssh, + :concurrency, ...), constructs a test map." + [opts] + (let [garage-version (get patches (:patch opts)) + db (grg/db garage-version) + workload ((get workloads (:workload opts)) opts) + scenario ((get scenari (:scenario opts)) (assoc opts :db db))] + (merge tests/noop-test + opts + {:pure-generators true + :name (str "garage-" (name (:patch opts)) " " (name (:workload opts)) " " (name (:scenario opts))) + :os debian/os + :db db + :client (:client workload) + :generator (gen/phases + (->> + (:generator workload) + (gen/stagger (/ (:rate opts))) + (gen/nemesis (:generator scenario)) + (gen/time-limit (:time-limit opts))) + (gen/log "Healing cluster") + (gen/nemesis (:final-generator scenario)) + (gen/log "Waiting for recovery") + (gen/sleep 10) + (gen/log "Running final generator") + (gen/clients (:final-generator workload)) + (gen/log "Generators all done")) + :nemesis (:nemesis scenario) + :checker (checker/compose + {:perf (checker/perf (:perf scenario)) + :workload (:checker workload)}) + }))) + + +(defn -main + "Handles command line arguments. Can either run a test, or a web server for + browsing results." + [& args] + (cli/run! (merge (cli/single-test-cmd {:test-fn garage-test + :opt-spec cli-opts}) + (cli/serve-cmd)) + args)) diff --git a/script/jepsen.garage/src/jepsen/garage/daemon.clj b/script/jepsen.garage/src/jepsen/garage/daemon.clj new file mode 100644 index 00000000..d407dd29 --- /dev/null +++ b/script/jepsen.garage/src/jepsen/garage/daemon.clj @@ -0,0 +1,152 @@ +(ns jepsen.garage.daemon + (:require [clojure.tools.logging :refer :all] + [jepsen [control :as c] + [core :as jepsen] + [db :as db]] + [jepsen.control.util :as cu])) + +; CONSTANTS -- HOW GARAGE IS SET UP + +(def base-dir "/opt/garage") +(def data-dir (str base-dir "/data")) +(def meta-dir (str base-dir "/meta")) +(def binary (str base-dir "/garage")) +(def logfile (str base-dir "/garage.log")) +(def pidfile (str base-dir "/garage.pid")) + +(def admin-token "icanhazadmin") +(def access-key-id "GK8bfb6a51286071c6c9cd8bc3") +(def secret-access-key "b0be95f71c1c6f16858a9edf395078b75c12ecb6b1c03385c4ae92076e4994a3") +(def bucket-name "jepsen") + +; THE GARAGE DB + +(defn install! + "Download and install Garage" + [node version] + (c/su + (c/trace + (info node "installing garage" version) + (c/exec :mkdir :-p base-dir) + (let [url (str "https://garagehq.deuxfleurs.fr/_releases/" version "/x86_64-unknown-linux-musl/garage") + cache (cu/cached-wget! url)] + (c/exec :cp cache binary)) + (c/exec :chmod :+x binary)))) + +(defn configure! + "Configure Garage" + [node] + (c/su + (c/trace + (cu/write-file! + (str "rpc_secret = \"0fffabe52542c2b89a56b2efb7dfd477e9dafb285c9025cbdf1de7ca21a6b372\"\n" + "rpc_bind_addr = \"0.0.0.0:3901\"\n" + "rpc_public_addr = \"" node ":3901\"\n" + "db_engine = \"lmdb\"\n" + "replication_mode = \"2\"\n" + "data_dir = \"" data-dir "\"\n" + "metadata_dir = \"" meta-dir "\"\n" + "[s3_api]\n" + "s3_region = \"us-east-1\"\n" + "api_bind_addr = \"0.0.0.0:3900\"\n" + "[k2v_api]\n" + "api_bind_addr = \"0.0.0.0:3902\"\n" + "[admin]\n" + "api_bind_addr = \"0.0.0.0:3903\"\n" + "admin_token = \"" admin-token "\"\n" + "trace_sink = \"http://192.168.56.1:4317\"\n") + "/etc/garage.toml")))) + +(defn connect-node! + "Connect a Garage node to the rest of the cluster" + [test node] + (c/trace + (let [node-id (c/exec binary :node :id :-q)] + (info node "node id:" node-id) + (c/on-many (:nodes test) + (c/exec binary :node :connect node-id))))) + +(defn configure-node! + "Configure a Garage node to be part of a cluster layout" + [test node] + (c/trace + (let [node-id (c/exec binary :node :id :-q)] + (c/on (jepsen/primary test) + (c/exec binary :layout :assign (subs node-id 0 16) :-c :1G :-z :dc1 :-t node))))) + +(defn finalize-config! + "Apply the layout and create a key/bucket pair in the cluster" + [node] + (c/trace + (c/exec binary :layout :apply :--version 1) + (info node "garage status:" (c/exec binary :status)) + (c/exec binary :key :import access-key-id secret-access-key :--yes) + (c/exec binary :bucket :create bucket-name) + (c/exec binary :bucket :allow :--read :--write bucket-name :--key access-key-id) + (info node "key info: " (c/exec binary :key :info access-key-id)))) + +(defn db + "Garage DB for a particular version" + [version] + (reify db/DB + (setup! [_ test node] + (install! node version) + (configure! node) + (cu/start-daemon! + {:logfile logfile + :pidfile pidfile + :chdir base-dir + :env {:RUST_LOG "garage=debug,garage_api=trace"}} + binary + :server) + (c/exec :sleep 3) + + (jepsen/synchronize test) + (connect-node! test node) + + (jepsen/synchronize test) + (configure-node! test node) + + (jepsen/synchronize test) + (when (= node (jepsen/primary test)) + (finalize-config! node))) + + (teardown! [_ test node] + (info node "tearing down garage" version) + (c/su + (cu/stop-daemon! binary pidfile) + (c/exec :rm :-rf logfile) + (c/exec :rm :-rf data-dir) + (c/exec :rm :-rf meta-dir))) + + db/Pause + (pause! [_ test node] + (cu/grepkill! :stop binary)) + (resume! [_ test node] + (cu/grepkill! :cont binary)) + + db/Kill + (kill! [_ test node] + (cu/stop-daemon! binary pidfile)) + (start! [_ test node] + (cu/start-daemon! + {:logfile logfile + :pidfile pidfile + :chdir base-dir + :env {:RUST_LOG "garage=debug,garage_api=trace"}} + binary + :server)) + + db/LogFiles + (log-files [_ test node] + [logfile]))) + +(defn creds + "Obtain Garage credentials for node" + [node] + {:access-key access-key-id + :secret-key secret-access-key + :endpoint (str "http://" node ":3900") + :bucket bucket-name + :client-config {:path-style-access-enabled true}}) + diff --git a/script/jepsen.garage/src/jepsen/garage/nemesis.clj b/script/jepsen.garage/src/jepsen/garage/nemesis.clj new file mode 100644 index 00000000..dfce0255 --- /dev/null +++ b/script/jepsen.garage/src/jepsen/garage/nemesis.clj @@ -0,0 +1,142 @@ +(ns jepsen.garage.nemesis + (:require [clojure.tools.logging :refer :all] + [jepsen [control :as c] + [core :as jepsen] + [generator :as gen] + [nemesis :as nemesis]] + [jepsen.nemesis.combined :as combined] + [jepsen.garage.daemon :as grg] + [jepsen.control.util :as cu])) + +; ---- reconfiguration nemesis ---- + +(defn configure-present! + "Configure node to be active in new cluster layout" + [test nodes] + (info "configure-present!" nodes) + (let [node-ids (c/on-many nodes (c/exec grg/binary :node :id :-q)) + node-id-strs (map (fn [[_ v]] (subs v 0 16)) node-ids)] + (c/on + (jepsen/primary test) + (apply c/exec (concat [grg/binary :layout :assign :-c :1G] node-id-strs))))) + +(defn configure-absent! + "Configure nodes to be active in new cluster layout" + [test nodes] + (info "configure-absent!" nodes) + (let [node-ids (c/on-many nodes (c/exec grg/binary :node :id :-q)) + node-id-strs (map (fn [[_ v]] (subs v 0 16)) node-ids)] + (c/on + (jepsen/primary test) + (apply c/exec (concat [grg/binary :layout :assign :-g] node-id-strs))))) + +(defn finalize-config! + "Apply the proposed cluster layout" + [test] + (let [layout-show (c/on (jepsen/primary test) (c/exec grg/binary :layout :show)) + [_ layout-next-version] (re-find #"apply --version (\d+)\n" layout-show)] + (if layout-next-version + (do + (info "layout show: " layout-show "; next-version: " layout-next-version) + (c/on (jepsen/primary test) + (c/exec grg/binary :layout :apply :--version layout-next-version))) + (info "no layout changes to apply")))) + +(defn reconfigure-subset + "Reconfigure cluster with only a subset of nodes" + [cnt] + (reify nemesis/Nemesis + (setup! [this test] this) + + (invoke! [this test op] op + (case (:f op) + :start + (let [[keep-nodes remove-nodes] + (->> (:nodes test) + shuffle + (split-at cnt))] + (info "layout split: keep " keep-nodes ", remove " remove-nodes) + (configure-present! test keep-nodes) + (configure-absent! test remove-nodes) + (finalize-config! test) + (assoc op :value keep-nodes)) + :stop + (do + (info "layout un-split: all nodes=" (:nodes test)) + (configure-present! test (:nodes test)) + (finalize-config! test) + (assoc op :value (:nodes test))))) + + (teardown! [this test] this))) + +; ---- nemesis scenari ---- + +(defn nemesis-op + "A generator for a single nemesis operation" + [op] + (fn [_ _] {:type :info, :f op})) + +(defn reconfiguration-package + "Cluster reconfiguration nemesis package" + [opts] + {:generator (->> + (gen/mix [(nemesis-op :reconfigure-start) + (nemesis-op :reconfigure-stop)]) + (gen/stagger (:interval opts 5))) + :final-generator {:type :info, :f :reconfigure-stop} + :nemesis (nemesis/compose + {{:reconfigure-start :start + :reconfigure-stop :stop} (reconfigure-subset 3)}) + :perf #{{:name "reconfigure" + :start #{:reconfigure-start} + :stop #{:reconfigur-stop} + :color "#A197E9"}}}) + +(defn scenario-c + "Clock modifying scenario" + [opts] + (combined/clock-package {:db (:db opts), :interval 1, :faults #{:clock}})) + +(defn scenario-cp + "Clock modifying + partition scenario" + [opts] + (combined/compose-packages + [(combined/clock-package {:db (:db opts), :interval 1, :faults #{:clock}}) + (combined/partition-package {:db (:db opts), :interval 1, :faults #{:partition}})])) + +(defn scenario-r + "Cluster reconfiguration scenario" + [opts] + (reconfiguration-package {:interval 1})) + +(defn scenario-pr + "Partition + cluster reconfiguration scenario" + [opts] + (combined/compose-packages + [(combined/partition-package {:db (:db opts), :interval 1, :faults #{:partition}}) + (reconfiguration-package {:interval 1})])) + +(defn scenario-cpr + "Clock scramble + partition + cluster reconfiguration scenario" + [opts] + (combined/compose-packages + [(combined/clock-package {:db (:db opts), :interval 1, :faults #{:clock}}) + (combined/partition-package {:db (:db opts), :interval 1, :faults #{:partition}}) + (reconfiguration-package {:interval 1})])) + +(defn scenario-cdp + "Clock modifying + db + partition scenario" + [opts] + (combined/compose-packages + [(combined/clock-package {:db (:db opts), :interval 1, :faults #{:clock}}) + (combined/db-package {:db (:db opts), :interval 1, :faults #{:db :pause :kill}}) + (combined/partition-package {:db (:db opts), :interval 1, :faults #{:partition}})])) + +(defn scenario-dpr + "Db + partition + cluster reconfiguration scenario" + [opts] + (combined/compose-packages + [(combined/db-package {:db (:db opts), :interval 1, :faults #{:db :pause :kill}}) + (combined/partition-package {:db (:db opts), :interval 1, :faults #{:partition}}) + (reconfiguration-package {:interval 1})])) + diff --git a/script/jepsen.garage/src/jepsen/garage/reg.clj b/script/jepsen.garage/src/jepsen/garage/reg.clj new file mode 100644 index 00000000..39708c0b --- /dev/null +++ b/script/jepsen.garage/src/jepsen/garage/reg.clj @@ -0,0 +1,143 @@ +(ns jepsen.garage.reg + (:require [clojure.tools.logging :refer :all] + [clojure.string :as str] + [clojure.set :as set] + [jepsen [checker :as checker] + [cli :as cli] + [client :as client] + [control :as c] + [db :as db] + [generator :as gen] + [independent :as independent] + [nemesis :as nemesis] + [util :as util] + [tests :as tests]] + [jepsen.checker.timeline :as timeline] + [jepsen.control.util :as cu] + [jepsen.os.debian :as debian] + [jepsen.garage.daemon :as grg] + [jepsen.garage.s3api :as s3] + [knossos.model :as model] + [slingshot.slingshot :refer [try+]])) + +(defn op-get [_ _] {:type :invoke, :f :read, :value nil}) +(defn op-put [_ _] {:type :invoke, :f :write, :value (str (rand-int 99))}) +(defn op-del [_ _] {:type :invoke, :f :write, :value nil}) + +(defrecord RegClient [creds] + client/Client + (open! [this test node] + (assoc this :creds (grg/creds node))) + (setup! [this test]) + (invoke! [this test op] + (try+ + (let [[k v] (:value op)] + (case (:f op) + :read + (util/timeout + 10000 + (assoc op :type :fail, :error ::timeout) + (let [value (s3/get (:creds this) k)] + (assoc op :type :ok, :value (independent/tuple k value)))) + :write + (util/timeout + 10000 + (assoc op :type :info, :error ::timeout) + (do + (s3/put (:creds this) k v) + (assoc op :type :ok))))) + (catch (re-find #"Unavailable" (.getMessage %)) ex + (assoc op :type :info, :error ::unavailable)) + (catch (re-find #"Broken pipe" (.getMessage %)) ex + (assoc op :type :info, :error ::broken-pipe)) + (catch (re-find #"Connection refused" (.getMessage %)) ex + (assoc op :type :info, :error ::connection-refused)))) + (teardown! [this test]) + (close! [this test])) + +(defn reg-read-after-write + "Read-after-Write checker for register operations" + [] + (reify checker/Checker + (check [this test history opts] + (let [init {:put-values {-1 nil} + :put-done #{-1} + :put-in-progress {} + :read-can-contain {} + :bad-reads #{}} + final (reduce + (fn [state op] + (let [current-values (set/union + (set (map (fn [idx] (get (:put-values state) idx)) (:put-done state))) + (set (map (fn [[_ [idx _]]] (get (:put-values state) idx)) (:put-in-progress state)))) + read-can-contain (reduce + (fn [rcc [idx v]] (assoc rcc idx (set/union current-values v))) + {} (:read-can-contain state))] + (info "--------") + (info "state: " state) + (info "current-values: " current-values) + (info "read-can-contain: " read-can-contain) + (info "op: " op) + (case [(:type op) (:f op)] + ([:invoke :write]) + (assoc state + :read-can-contain read-can-contain + :put-values (assoc (:put-values state) (:index op) (:value op)) + :put-in-progress (assoc (:put-in-progress state) (:process op) [(:index op) (:put-done state)])) + ([:ok :write]) + (let [[index overwrites] (get (:put-in-progress state) (:process op))] + (assoc state + :read-can-contain read-can-contain + :put-in-progress (dissoc (:put-in-progress state) (:process op)) + :put-done + (conj + (set/difference (:put-done state) overwrites) + index))) + ([:invoke :read]) + (assoc state + :read-can-contain (assoc read-can-contain (:process op) current-values)) + ([:ok :read]) + (let [this-read-can-contain (get read-can-contain (:process op)) + bad-reads (if (contains? this-read-can-contain (:value op)) + (:bad-reads state) + (conj (:bad-reads state) [(:process op) (:index op) (:value op) this-read-can-contain]))] + (info "this-read-can-contain: " this-read-can-contain) + (assoc state + :read-can-contain (dissoc read-can-contain (:process op)) + :bad-reads bad-reads)) + state))) + init history) + valid? (empty? (:bad-reads final))] + (assoc final :valid? valid?))))) + +(defn workload-common + "Common parts of workload" + [opts] + {:client (RegClient. nil) + :generator (independent/concurrent-generator + 10 + (range) + (fn [k] + (->> + (gen/mix [op-get op-put op-del]) + (gen/limit (:ops-per-key opts)))))}) + +(defn workload1 + "Tests linearizable reads and writes" + [opts] + (assoc (workload-common opts) + :checker (independent/checker + (checker/compose + {:linear (checker/linearizable + {:model (model/register) + :algorithm :linear}) + :timeline (timeline/html)})))) + +(defn workload2 + "Tests CRDT reads and writes" + [opts] + (assoc (workload-common opts) + :checker (independent/checker + (checker/compose + {:reg-read-after-write (reg-read-after-write) + :timeline (timeline/html)})))) diff --git a/script/jepsen.garage/src/jepsen/garage/s3api.clj b/script/jepsen.garage/src/jepsen/garage/s3api.clj new file mode 100644 index 00000000..531e0157 --- /dev/null +++ b/script/jepsen.garage/src/jepsen/garage/s3api.clj @@ -0,0 +1,48 @@ +(ns jepsen.garage.s3api + (:require [clojure.tools.logging :refer :all] + [jepsen [control :as c]] + [amazonica.aws.s3 :as s3] + [slingshot.slingshot :refer [try+]])) + +; GARAGE S3 HELPER FUNCTIONS + +(defn get + "Helper for GetObject" + [creds k] + (try+ + (-> (s3/get-object creds (:bucket creds) k) + :input-stream + slurp) + (catch (re-find #"Key not found" (.getMessage %)) ex + nil))) + +(defn put + "Helper for PutObject or DeleteObject (is a delete if value is nil)" + [creds k v] + (if (= v nil) + (s3/delete-object creds + :bucket-name (:bucket creds) + :key k) + (let [some-bytes (.getBytes v "UTF-8") + bytes-stream (java.io.ByteArrayInputStream. some-bytes)] + (s3/put-object creds + :bucket-name (:bucket creds) + :key k + :input-stream bytes-stream + :metadata {:content-length (count some-bytes)})))) + +(defn list-inner [creds prefix ct accum] + (let [list-result (s3/list-objects-v2 creds + {:bucket-name (:bucket creds) + :prefix prefix + :continuation-token ct}) + new-object-summaries (:object-summaries list-result) + new-objects (map (fn [d] (:key d)) new-object-summaries) + objects (concat new-objects accum)] + (if (:truncated? list-result) + (list-inner creds prefix (:next-continuation-token list-result) objects) + objects))) +(defn list + "Helper for ListObjects -- just lists everything in the bucket" + [creds prefix] + (list-inner creds prefix nil [])) diff --git a/script/jepsen.garage/src/jepsen/garage/set.clj b/script/jepsen.garage/src/jepsen/garage/set.clj new file mode 100644 index 00000000..2c7a2ccd --- /dev/null +++ b/script/jepsen.garage/src/jepsen/garage/set.clj @@ -0,0 +1,135 @@ +(ns jepsen.garage.set + (:require [clojure.tools.logging :refer :all] + [clojure.string :as str] + [clojure.set :as set] + [jepsen [checker :as checker] + [cli :as cli] + [client :as client] + [control :as c] + [checker :as checker] + [db :as db] + [generator :as gen] + [independent :as independent] + [nemesis :as nemesis] + [util :as util] + [tests :as tests]] + [jepsen.checker.timeline :as timeline] + [jepsen.control.util :as cu] + [jepsen.os.debian :as debian] + [jepsen.garage.daemon :as grg] + [jepsen.garage.s3api :as s3] + [knossos.model :as model] + [slingshot.slingshot :refer [try+]])) + +(defn op-add-rand100 [_ _] {:type :invoke, :f :add, :value (rand-int 100)}) +(defn op-read [_ _] {:type :invoke, :f :read, :value nil}) + +(defrecord SetClient [creds] + client/Client + (open! [this test node] + (assoc this :creds (grg/creds node))) + (setup! [this test]) + (invoke! [this test op] + (try+ + (let [[k v] (:value op) + prefix (str "set" k "/")] + (case (:f op) + :add + (util/timeout + 10000 + (assoc op :type :info, :error ::timeout) + (do + (s3/put (:creds this) (str prefix v) "present") + (assoc op :type :ok))) + :read + (util/timeout + 10000 + (assoc op :type :fail, :error ::timeout) + (do + (let [items (s3/list (:creds this) prefix)] + (let [items-stripped (map (fn [o] + (assert (str/starts-with? o prefix)) + (str/replace-first o prefix "")) items) + items-set (set (map parse-long items-stripped))] + (assoc op :type :ok, :value (independent/tuple k items-set)))))))) + (catch (re-find #"Unavailable" (.getMessage %)) ex + (assoc op :type :info, :error ::unavailable)) + (catch (re-find #"Broken pipe" (.getMessage %)) ex + (assoc op :type :info, :error ::broken-pipe)) + (catch (re-find #"Connection refused" (.getMessage %)) ex + (assoc op :type :info, :error ::connection-refused)))) + (teardown! [this test]) + (close! [this test])) + +(defn set-read-after-write + "Read-after-Write checker for set operations" + [] + (reify checker/Checker + (check [this test history opts] + (let [init {:add-started #{} + :add-done #{} + :read-must-contain {} + :missed #{} + :unexpected #{}} + final (reduce + (fn [state op] + (case [(:type op) (:f op)] + ([:invoke :add]) + (assoc state :add-started (conj (:add-started state) (:value op))) + ([:ok :add]) + (assoc state :add-done (conj (:add-done state) (:value op))) + ([:invoke :read]) + (assoc-in state [:read-must-contain (:process op)] (:add-done state)) + ([:ok :read]) + (let [read-must-contain (get (:read-must-contain state) (:process op)) + new-missed (set/difference read-must-contain (:value op)) + new-unexpected (set/difference (:value op) (:add-started state))] + (assoc state + :read-must-contain (dissoc (:read-must-contain state) (:process op)) + :missed (set/union (:missed state) new-missed), + :unexpected (set/union (:unexpected state) new-unexpected))) + state)) + init history) + valid? (and (empty? (:missed final)) (empty? (:unexpected final)))] + (assoc final :valid? valid?))))) + +(defn workload1 + "Tests insertions and deletions" + [opts] + {:client (SetClient. nil) + :checker (independent/checker + (checker/compose + {:set (checker/set) + :timeline (timeline/html)})) + :generator (independent/concurrent-generator + 10 + (range 100) + (fn [k] + (->> (range) + (map (fn [x] {:type :invoke, :f :add, :value x})) + (gen/limit (:ops-per-key opts))))) + :final-generator (independent/concurrent-generator + 10 + (range 100) + (fn [k] + (gen/phases + (gen/once op-read) + (gen/sleep 5))))}) + +(defn workload2 + "Tests insertions and deletions" + [opts] + {:client (SetClient. nil) + :checker (independent/checker + (checker/compose + {:set-read-after-write (set-read-after-write) + ; :set-full (checker/set-full {:linearizable? false}) + :timeline (timeline/html)})) + :generator (independent/concurrent-generator + 10 + (range) + (fn [k] + (->> (gen/mix [op-add-rand100 op-read]) + (gen/limit (:ops-per-key opts)))))}) + + |