aboutsummaryrefslogtreecommitdiff
path: root/script/jepsen.garage/src/jepsen
diff options
context:
space:
mode:
authorAlex <alex@adnab.me>2024-01-11 10:52:12 +0000
committerAlex <alex@adnab.me>2024-01-11 10:52:12 +0000
commit723e56b37f13f078a15e067343191fb1bf96e8b2 (patch)
tree8188e7e8bc41867ce2de61cd7df6554e05678d11 /script/jepsen.garage/src/jepsen
parenta8b0e01f88b947bc34c05d818d51860b4d171967 (diff)
parentfa9247f11b89c960dffe82d6bf990ed4335788e3 (diff)
downloadgarage-723e56b37f13f078a15e067343191fb1bf96e8b2.tar.gz
garage-723e56b37f13f078a15e067343191fb1bf96e8b2.zip
Merge pull request 'Jepsen testing (NLnet task 3 subtask 1)' (#544) from jepsen into main
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/544
Diffstat (limited to 'script/jepsen.garage/src/jepsen')
-rw-r--r--script/jepsen.garage/src/jepsen/garage.clj105
-rw-r--r--script/jepsen.garage/src/jepsen/garage/daemon.clj152
-rw-r--r--script/jepsen.garage/src/jepsen/garage/nemesis.clj142
-rw-r--r--script/jepsen.garage/src/jepsen/garage/reg.clj143
-rw-r--r--script/jepsen.garage/src/jepsen/garage/s3api.clj48
-rw-r--r--script/jepsen.garage/src/jepsen/garage/set.clj135
6 files changed, 725 insertions, 0 deletions
diff --git a/script/jepsen.garage/src/jepsen/garage.clj b/script/jepsen.garage/src/jepsen/garage.clj
new file mode 100644
index 00000000..446b81de
--- /dev/null
+++ b/script/jepsen.garage/src/jepsen/garage.clj
@@ -0,0 +1,105 @@
+(ns jepsen.garage
+ (:require
+ [clojure.string :as str]
+ [jepsen
+ [checker :as checker]
+ [cli :as cli]
+ [generator :as gen]
+ [nemesis :as nemesis]
+ [tests :as tests]]
+ [jepsen.os.debian :as debian]
+ [jepsen.garage
+ [daemon :as grg]
+ [nemesis :as grgNemesis]
+ [reg :as reg]
+ [set :as set]]))
+
+(def workloads
+ "A map of workload names to functions that construct workloads, given opts."
+ {"reg1" reg/workload1
+ "reg2" reg/workload2
+ "set1" set/workload1
+ "set2" set/workload2})
+
+(def scenari
+ "A map of scenari to the associated nemesis"
+ {"c" grgNemesis/scenario-c
+ "cp" grgNemesis/scenario-cp
+ "r" grgNemesis/scenario-r
+ "pr" grgNemesis/scenario-pr
+ "cpr" grgNemesis/scenario-cpr
+ "cdp" grgNemesis/scenario-cdp
+ "dpr" grgNemesis/scenario-dpr})
+
+(def patches
+ "A map of patch names to Garage builds"
+ {"default" "v0.9.0"
+ "tsfix1" "d146cdd5b66ca1d3ed65ce93ca42c6db22defc09"
+ "tsfix2" "c82d91c6bccf307186332b6c5c6fc0b128b1b2b1"
+ "task3a" "707442f5de416fdbed4681a33b739f0a787b7834"
+ "task3b" "431b28e0cfdc9cac6c649193cf602108a8b02997"
+ "task3c" "0041b013a473e3ae72f50209d8f79db75a72848b"})
+
+(def cli-opts
+ "Additional command line options."
+ [["-p" "--patch NAME" "Garage patch to use"
+ :default "default"
+ :validate [patches (cli/one-of patches)]]
+ ["-s" "--scenario NAME" "Nemesis scenario to run"
+ :default "cp"
+ :validate [scenari (cli/one-of scenari)]]
+ ["-r" "--rate HZ" "Approximate number of requests per second, per thread."
+ :default 10
+ :parse-fn read-string
+ :validate [#(and (number? %) (pos? %)) "Must be a positive number"]]
+ [nil "--ops-per-key NUM" "Maximum number of operations on any given key."
+ :default 100
+ :parse-fn parse-long
+ :validate [pos? "Must be a positive integer."]]
+ ["-w" "--workload NAME" "Workload of test to run"
+ :default "reg1"
+ :validate [workloads (cli/one-of workloads)]]])
+
+(defn garage-test
+ "Given an options map from the command line runner (e.g. :nodes, :ssh,
+ :concurrency, ...), constructs a test map."
+ [opts]
+ (let [garage-version (get patches (:patch opts))
+ db (grg/db garage-version)
+ workload ((get workloads (:workload opts)) opts)
+ scenario ((get scenari (:scenario opts)) (assoc opts :db db))]
+ (merge tests/noop-test
+ opts
+ {:pure-generators true
+ :name (str "garage-" (name (:patch opts)) " " (name (:workload opts)) " " (name (:scenario opts)))
+ :os debian/os
+ :db db
+ :client (:client workload)
+ :generator (gen/phases
+ (->>
+ (:generator workload)
+ (gen/stagger (/ (:rate opts)))
+ (gen/nemesis (:generator scenario))
+ (gen/time-limit (:time-limit opts)))
+ (gen/log "Healing cluster")
+ (gen/nemesis (:final-generator scenario))
+ (gen/log "Waiting for recovery")
+ (gen/sleep 10)
+ (gen/log "Running final generator")
+ (gen/clients (:final-generator workload))
+ (gen/log "Generators all done"))
+ :nemesis (:nemesis scenario)
+ :checker (checker/compose
+ {:perf (checker/perf (:perf scenario))
+ :workload (:checker workload)})
+ })))
+
+
+(defn -main
+ "Handles command line arguments. Can either run a test, or a web server for
+ browsing results."
+ [& args]
+ (cli/run! (merge (cli/single-test-cmd {:test-fn garage-test
+ :opt-spec cli-opts})
+ (cli/serve-cmd))
+ args))
diff --git a/script/jepsen.garage/src/jepsen/garage/daemon.clj b/script/jepsen.garage/src/jepsen/garage/daemon.clj
new file mode 100644
index 00000000..d407dd29
--- /dev/null
+++ b/script/jepsen.garage/src/jepsen/garage/daemon.clj
@@ -0,0 +1,152 @@
+(ns jepsen.garage.daemon
+ (:require [clojure.tools.logging :refer :all]
+ [jepsen [control :as c]
+ [core :as jepsen]
+ [db :as db]]
+ [jepsen.control.util :as cu]))
+
+; CONSTANTS -- HOW GARAGE IS SET UP
+
+(def base-dir "/opt/garage")
+(def data-dir (str base-dir "/data"))
+(def meta-dir (str base-dir "/meta"))
+(def binary (str base-dir "/garage"))
+(def logfile (str base-dir "/garage.log"))
+(def pidfile (str base-dir "/garage.pid"))
+
+(def admin-token "icanhazadmin")
+(def access-key-id "GK8bfb6a51286071c6c9cd8bc3")
+(def secret-access-key "b0be95f71c1c6f16858a9edf395078b75c12ecb6b1c03385c4ae92076e4994a3")
+(def bucket-name "jepsen")
+
+; THE GARAGE DB
+
+(defn install!
+ "Download and install Garage"
+ [node version]
+ (c/su
+ (c/trace
+ (info node "installing garage" version)
+ (c/exec :mkdir :-p base-dir)
+ (let [url (str "https://garagehq.deuxfleurs.fr/_releases/" version "/x86_64-unknown-linux-musl/garage")
+ cache (cu/cached-wget! url)]
+ (c/exec :cp cache binary))
+ (c/exec :chmod :+x binary))))
+
+(defn configure!
+ "Configure Garage"
+ [node]
+ (c/su
+ (c/trace
+ (cu/write-file!
+ (str "rpc_secret = \"0fffabe52542c2b89a56b2efb7dfd477e9dafb285c9025cbdf1de7ca21a6b372\"\n"
+ "rpc_bind_addr = \"0.0.0.0:3901\"\n"
+ "rpc_public_addr = \"" node ":3901\"\n"
+ "db_engine = \"lmdb\"\n"
+ "replication_mode = \"2\"\n"
+ "data_dir = \"" data-dir "\"\n"
+ "metadata_dir = \"" meta-dir "\"\n"
+ "[s3_api]\n"
+ "s3_region = \"us-east-1\"\n"
+ "api_bind_addr = \"0.0.0.0:3900\"\n"
+ "[k2v_api]\n"
+ "api_bind_addr = \"0.0.0.0:3902\"\n"
+ "[admin]\n"
+ "api_bind_addr = \"0.0.0.0:3903\"\n"
+ "admin_token = \"" admin-token "\"\n"
+ "trace_sink = \"http://192.168.56.1:4317\"\n")
+ "/etc/garage.toml"))))
+
+(defn connect-node!
+ "Connect a Garage node to the rest of the cluster"
+ [test node]
+ (c/trace
+ (let [node-id (c/exec binary :node :id :-q)]
+ (info node "node id:" node-id)
+ (c/on-many (:nodes test)
+ (c/exec binary :node :connect node-id)))))
+
+(defn configure-node!
+ "Configure a Garage node to be part of a cluster layout"
+ [test node]
+ (c/trace
+ (let [node-id (c/exec binary :node :id :-q)]
+ (c/on (jepsen/primary test)
+ (c/exec binary :layout :assign (subs node-id 0 16) :-c :1G :-z :dc1 :-t node)))))
+
+(defn finalize-config!
+ "Apply the layout and create a key/bucket pair in the cluster"
+ [node]
+ (c/trace
+ (c/exec binary :layout :apply :--version 1)
+ (info node "garage status:" (c/exec binary :status))
+ (c/exec binary :key :import access-key-id secret-access-key :--yes)
+ (c/exec binary :bucket :create bucket-name)
+ (c/exec binary :bucket :allow :--read :--write bucket-name :--key access-key-id)
+ (info node "key info: " (c/exec binary :key :info access-key-id))))
+
+(defn db
+ "Garage DB for a particular version"
+ [version]
+ (reify db/DB
+ (setup! [_ test node]
+ (install! node version)
+ (configure! node)
+ (cu/start-daemon!
+ {:logfile logfile
+ :pidfile pidfile
+ :chdir base-dir
+ :env {:RUST_LOG "garage=debug,garage_api=trace"}}
+ binary
+ :server)
+ (c/exec :sleep 3)
+
+ (jepsen/synchronize test)
+ (connect-node! test node)
+
+ (jepsen/synchronize test)
+ (configure-node! test node)
+
+ (jepsen/synchronize test)
+ (when (= node (jepsen/primary test))
+ (finalize-config! node)))
+
+ (teardown! [_ test node]
+ (info node "tearing down garage" version)
+ (c/su
+ (cu/stop-daemon! binary pidfile)
+ (c/exec :rm :-rf logfile)
+ (c/exec :rm :-rf data-dir)
+ (c/exec :rm :-rf meta-dir)))
+
+ db/Pause
+ (pause! [_ test node]
+ (cu/grepkill! :stop binary))
+ (resume! [_ test node]
+ (cu/grepkill! :cont binary))
+
+ db/Kill
+ (kill! [_ test node]
+ (cu/stop-daemon! binary pidfile))
+ (start! [_ test node]
+ (cu/start-daemon!
+ {:logfile logfile
+ :pidfile pidfile
+ :chdir base-dir
+ :env {:RUST_LOG "garage=debug,garage_api=trace"}}
+ binary
+ :server))
+
+ db/LogFiles
+ (log-files [_ test node]
+ [logfile])))
+
+(defn creds
+ "Obtain Garage credentials for node"
+ [node]
+ {:access-key access-key-id
+ :secret-key secret-access-key
+ :endpoint (str "http://" node ":3900")
+ :bucket bucket-name
+ :client-config {:path-style-access-enabled true}})
+
diff --git a/script/jepsen.garage/src/jepsen/garage/nemesis.clj b/script/jepsen.garage/src/jepsen/garage/nemesis.clj
new file mode 100644
index 00000000..dfce0255
--- /dev/null
+++ b/script/jepsen.garage/src/jepsen/garage/nemesis.clj
@@ -0,0 +1,142 @@
+(ns jepsen.garage.nemesis
+ (:require [clojure.tools.logging :refer :all]
+ [jepsen [control :as c]
+ [core :as jepsen]
+ [generator :as gen]
+ [nemesis :as nemesis]]
+ [jepsen.nemesis.combined :as combined]
+ [jepsen.garage.daemon :as grg]
+ [jepsen.control.util :as cu]))
+
+; ---- reconfiguration nemesis ----
+
+(defn configure-present!
+ "Configure node to be active in new cluster layout"
+ [test nodes]
+ (info "configure-present!" nodes)
+ (let [node-ids (c/on-many nodes (c/exec grg/binary :node :id :-q))
+ node-id-strs (map (fn [[_ v]] (subs v 0 16)) node-ids)]
+ (c/on
+ (jepsen/primary test)
+ (apply c/exec (concat [grg/binary :layout :assign :-c :1G] node-id-strs)))))
+
+(defn configure-absent!
+ "Configure nodes to be active in new cluster layout"
+ [test nodes]
+ (info "configure-absent!" nodes)
+ (let [node-ids (c/on-many nodes (c/exec grg/binary :node :id :-q))
+ node-id-strs (map (fn [[_ v]] (subs v 0 16)) node-ids)]
+ (c/on
+ (jepsen/primary test)
+ (apply c/exec (concat [grg/binary :layout :assign :-g] node-id-strs)))))
+
+(defn finalize-config!
+ "Apply the proposed cluster layout"
+ [test]
+ (let [layout-show (c/on (jepsen/primary test) (c/exec grg/binary :layout :show))
+ [_ layout-next-version] (re-find #"apply --version (\d+)\n" layout-show)]
+ (if layout-next-version
+ (do
+ (info "layout show: " layout-show "; next-version: " layout-next-version)
+ (c/on (jepsen/primary test)
+ (c/exec grg/binary :layout :apply :--version layout-next-version)))
+ (info "no layout changes to apply"))))
+
+(defn reconfigure-subset
+ "Reconfigure cluster with only a subset of nodes"
+ [cnt]
+ (reify nemesis/Nemesis
+ (setup! [this test] this)
+
+ (invoke! [this test op] op
+ (case (:f op)
+ :start
+ (let [[keep-nodes remove-nodes]
+ (->> (:nodes test)
+ shuffle
+ (split-at cnt))]
+ (info "layout split: keep " keep-nodes ", remove " remove-nodes)
+ (configure-present! test keep-nodes)
+ (configure-absent! test remove-nodes)
+ (finalize-config! test)
+ (assoc op :value keep-nodes))
+ :stop
+ (do
+ (info "layout un-split: all nodes=" (:nodes test))
+ (configure-present! test (:nodes test))
+ (finalize-config! test)
+ (assoc op :value (:nodes test)))))
+
+ (teardown! [this test] this)))
+
+; ---- nemesis scenari ----
+
+(defn nemesis-op
+ "A generator for a single nemesis operation"
+ [op]
+ (fn [_ _] {:type :info, :f op}))
+
+(defn reconfiguration-package
+ "Cluster reconfiguration nemesis package"
+ [opts]
+ {:generator (->>
+ (gen/mix [(nemesis-op :reconfigure-start)
+ (nemesis-op :reconfigure-stop)])
+ (gen/stagger (:interval opts 5)))
+ :final-generator {:type :info, :f :reconfigure-stop}
+ :nemesis (nemesis/compose
+ {{:reconfigure-start :start
+ :reconfigure-stop :stop} (reconfigure-subset 3)})
+ :perf #{{:name "reconfigure"
+ :start #{:reconfigure-start}
+ :stop #{:reconfigur-stop}
+ :color "#A197E9"}}})
+
+(defn scenario-c
+ "Clock modifying scenario"
+ [opts]
+ (combined/clock-package {:db (:db opts), :interval 1, :faults #{:clock}}))
+
+(defn scenario-cp
+ "Clock modifying + partition scenario"
+ [opts]
+ (combined/compose-packages
+ [(combined/clock-package {:db (:db opts), :interval 1, :faults #{:clock}})
+ (combined/partition-package {:db (:db opts), :interval 1, :faults #{:partition}})]))
+
+(defn scenario-r
+ "Cluster reconfiguration scenario"
+ [opts]
+ (reconfiguration-package {:interval 1}))
+
+(defn scenario-pr
+ "Partition + cluster reconfiguration scenario"
+ [opts]
+ (combined/compose-packages
+ [(combined/partition-package {:db (:db opts), :interval 1, :faults #{:partition}})
+ (reconfiguration-package {:interval 1})]))
+
+(defn scenario-cpr
+ "Clock scramble + partition + cluster reconfiguration scenario"
+ [opts]
+ (combined/compose-packages
+ [(combined/clock-package {:db (:db opts), :interval 1, :faults #{:clock}})
+ (combined/partition-package {:db (:db opts), :interval 1, :faults #{:partition}})
+ (reconfiguration-package {:interval 1})]))
+
+(defn scenario-cdp
+ "Clock modifying + db + partition scenario"
+ [opts]
+ (combined/compose-packages
+ [(combined/clock-package {:db (:db opts), :interval 1, :faults #{:clock}})
+ (combined/db-package {:db (:db opts), :interval 1, :faults #{:db :pause :kill}})
+ (combined/partition-package {:db (:db opts), :interval 1, :faults #{:partition}})]))
+
+(defn scenario-dpr
+ "Db + partition + cluster reconfiguration scenario"
+ [opts]
+ (combined/compose-packages
+ [(combined/db-package {:db (:db opts), :interval 1, :faults #{:db :pause :kill}})
+ (combined/partition-package {:db (:db opts), :interval 1, :faults #{:partition}})
+ (reconfiguration-package {:interval 1})]))
+
diff --git a/script/jepsen.garage/src/jepsen/garage/reg.clj b/script/jepsen.garage/src/jepsen/garage/reg.clj
new file mode 100644
index 00000000..39708c0b
--- /dev/null
+++ b/script/jepsen.garage/src/jepsen/garage/reg.clj
@@ -0,0 +1,143 @@
+(ns jepsen.garage.reg
+ (:require [clojure.tools.logging :refer :all]
+ [clojure.string :as str]
+ [clojure.set :as set]
+ [jepsen [checker :as checker]
+ [cli :as cli]
+ [client :as client]
+ [control :as c]
+ [db :as db]
+ [generator :as gen]
+ [independent :as independent]
+ [nemesis :as nemesis]
+ [util :as util]
+ [tests :as tests]]
+ [jepsen.checker.timeline :as timeline]
+ [jepsen.control.util :as cu]
+ [jepsen.os.debian :as debian]
+ [jepsen.garage.daemon :as grg]
+ [jepsen.garage.s3api :as s3]
+ [knossos.model :as model]
+ [slingshot.slingshot :refer [try+]]))
+
+(defn op-get [_ _] {:type :invoke, :f :read, :value nil})
+(defn op-put [_ _] {:type :invoke, :f :write, :value (str (rand-int 99))})
+(defn op-del [_ _] {:type :invoke, :f :write, :value nil})
+
+(defrecord RegClient [creds]
+ client/Client
+ (open! [this test node]
+ (assoc this :creds (grg/creds node)))
+ (setup! [this test])
+ (invoke! [this test op]
+ (try+
+ (let [[k v] (:value op)]
+ (case (:f op)
+ :read
+ (util/timeout
+ 10000
+ (assoc op :type :fail, :error ::timeout)
+ (let [value (s3/get (:creds this) k)]
+ (assoc op :type :ok, :value (independent/tuple k value))))
+ :write
+ (util/timeout
+ 10000
+ (assoc op :type :info, :error ::timeout)
+ (do
+ (s3/put (:creds this) k v)
+ (assoc op :type :ok)))))
+ (catch (re-find #"Unavailable" (.getMessage %)) ex
+ (assoc op :type :info, :error ::unavailable))
+ (catch (re-find #"Broken pipe" (.getMessage %)) ex
+ (assoc op :type :info, :error ::broken-pipe))
+ (catch (re-find #"Connection refused" (.getMessage %)) ex
+ (assoc op :type :info, :error ::connection-refused))))
+ (teardown! [this test])
+ (close! [this test]))
+
+(defn reg-read-after-write
+ "Read-after-Write checker for register operations"
+ []
+ (reify checker/Checker
+ (check [this test history opts]
+ (let [init {:put-values {-1 nil}
+ :put-done #{-1}
+ :put-in-progress {}
+ :read-can-contain {}
+ :bad-reads #{}}
+ final (reduce
+ (fn [state op]
+ (let [current-values (set/union
+ (set (map (fn [idx] (get (:put-values state) idx)) (:put-done state)))
+ (set (map (fn [[_ [idx _]]] (get (:put-values state) idx)) (:put-in-progress state))))
+ read-can-contain (reduce
+ (fn [rcc [idx v]] (assoc rcc idx (set/union current-values v)))
+ {} (:read-can-contain state))]
+ (info "--------")
+ (info "state: " state)
+ (info "current-values: " current-values)
+ (info "read-can-contain: " read-can-contain)
+ (info "op: " op)
+ (case [(:type op) (:f op)]
+ ([:invoke :write])
+ (assoc state
+ :read-can-contain read-can-contain
+ :put-values (assoc (:put-values state) (:index op) (:value op))
+ :put-in-progress (assoc (:put-in-progress state) (:process op) [(:index op) (:put-done state)]))
+ ([:ok :write])
+ (let [[index overwrites] (get (:put-in-progress state) (:process op))]
+ (assoc state
+ :read-can-contain read-can-contain
+ :put-in-progress (dissoc (:put-in-progress state) (:process op))
+ :put-done
+ (conj
+ (set/difference (:put-done state) overwrites)
+ index)))
+ ([:invoke :read])
+ (assoc state
+ :read-can-contain (assoc read-can-contain (:process op) current-values))
+ ([:ok :read])
+ (let [this-read-can-contain (get read-can-contain (:process op))
+ bad-reads (if (contains? this-read-can-contain (:value op))
+ (:bad-reads state)
+ (conj (:bad-reads state) [(:process op) (:index op) (:value op) this-read-can-contain]))]
+ (info "this-read-can-contain: " this-read-can-contain)
+ (assoc state
+ :read-can-contain (dissoc read-can-contain (:process op))
+ :bad-reads bad-reads))
+ state)))
+ init history)
+ valid? (empty? (:bad-reads final))]
+ (assoc final :valid? valid?)))))
+
+(defn workload-common
+ "Common parts of workload"
+ [opts]
+ {:client (RegClient. nil)
+ :generator (independent/concurrent-generator
+ 10
+ (range)
+ (fn [k]
+ (->>
+ (gen/mix [op-get op-put op-del])
+ (gen/limit (:ops-per-key opts)))))})
+
+(defn workload1
+ "Tests linearizable reads and writes"
+ [opts]
+ (assoc (workload-common opts)
+ :checker (independent/checker
+ (checker/compose
+ {:linear (checker/linearizable
+ {:model (model/register)
+ :algorithm :linear})
+ :timeline (timeline/html)}))))
+
+(defn workload2
+ "Tests CRDT reads and writes"
+ [opts]
+ (assoc (workload-common opts)
+ :checker (independent/checker
+ (checker/compose
+ {:reg-read-after-write (reg-read-after-write)
+ :timeline (timeline/html)}))))
diff --git a/script/jepsen.garage/src/jepsen/garage/s3api.clj b/script/jepsen.garage/src/jepsen/garage/s3api.clj
new file mode 100644
index 00000000..531e0157
--- /dev/null
+++ b/script/jepsen.garage/src/jepsen/garage/s3api.clj
@@ -0,0 +1,48 @@
+(ns jepsen.garage.s3api
+ (:require [clojure.tools.logging :refer :all]
+ [jepsen [control :as c]]
+ [amazonica.aws.s3 :as s3]
+ [slingshot.slingshot :refer [try+]]))
+
+; GARAGE S3 HELPER FUNCTIONS
+
+(defn get
+ "Helper for GetObject"
+ [creds k]
+ (try+
+ (-> (s3/get-object creds (:bucket creds) k)
+ :input-stream
+ slurp)
+ (catch (re-find #"Key not found" (.getMessage %)) ex
+ nil)))
+
+(defn put
+ "Helper for PutObject or DeleteObject (is a delete if value is nil)"
+ [creds k v]
+ (if (= v nil)
+ (s3/delete-object creds
+ :bucket-name (:bucket creds)
+ :key k)
+ (let [some-bytes (.getBytes v "UTF-8")
+ bytes-stream (java.io.ByteArrayInputStream. some-bytes)]
+ (s3/put-object creds
+ :bucket-name (:bucket creds)
+ :key k
+ :input-stream bytes-stream
+ :metadata {:content-length (count some-bytes)}))))
+
+(defn list-inner [creds prefix ct accum]
+ (let [list-result (s3/list-objects-v2 creds
+ {:bucket-name (:bucket creds)
+ :prefix prefix
+ :continuation-token ct})
+ new-object-summaries (:object-summaries list-result)
+ new-objects (map (fn [d] (:key d)) new-object-summaries)
+ objects (concat new-objects accum)]
+ (if (:truncated? list-result)
+ (list-inner creds prefix (:next-continuation-token list-result) objects)
+ objects)))
+(defn list
+ "Helper for ListObjects -- just lists everything in the bucket"
+ [creds prefix]
+ (list-inner creds prefix nil []))
diff --git a/script/jepsen.garage/src/jepsen/garage/set.clj b/script/jepsen.garage/src/jepsen/garage/set.clj
new file mode 100644
index 00000000..2c7a2ccd
--- /dev/null
+++ b/script/jepsen.garage/src/jepsen/garage/set.clj
@@ -0,0 +1,135 @@
+(ns jepsen.garage.set
+ (:require [clojure.tools.logging :refer :all]
+ [clojure.string :as str]
+ [clojure.set :as set]
+ [jepsen [checker :as checker]
+ [cli :as cli]
+ [client :as client]
+ [control :as c]
+ [checker :as checker]
+ [db :as db]
+ [generator :as gen]
+ [independent :as independent]
+ [nemesis :as nemesis]
+ [util :as util]
+ [tests :as tests]]
+ [jepsen.checker.timeline :as timeline]
+ [jepsen.control.util :as cu]
+ [jepsen.os.debian :as debian]
+ [jepsen.garage.daemon :as grg]
+ [jepsen.garage.s3api :as s3]
+ [knossos.model :as model]
+ [slingshot.slingshot :refer [try+]]))
+
+(defn op-add-rand100 [_ _] {:type :invoke, :f :add, :value (rand-int 100)})
+(defn op-read [_ _] {:type :invoke, :f :read, :value nil})
+
+(defrecord SetClient [creds]
+ client/Client
+ (open! [this test node]
+ (assoc this :creds (grg/creds node)))
+ (setup! [this test])
+ (invoke! [this test op]
+ (try+
+ (let [[k v] (:value op)
+ prefix (str "set" k "/")]
+ (case (:f op)
+ :add
+ (util/timeout
+ 10000
+ (assoc op :type :info, :error ::timeout)
+ (do
+ (s3/put (:creds this) (str prefix v) "present")
+ (assoc op :type :ok)))
+ :read
+ (util/timeout
+ 10000
+ (assoc op :type :fail, :error ::timeout)
+ (do
+ (let [items (s3/list (:creds this) prefix)]
+ (let [items-stripped (map (fn [o]
+ (assert (str/starts-with? o prefix))
+ (str/replace-first o prefix "")) items)
+ items-set (set (map parse-long items-stripped))]
+ (assoc op :type :ok, :value (independent/tuple k items-set))))))))
+ (catch (re-find #"Unavailable" (.getMessage %)) ex
+ (assoc op :type :info, :error ::unavailable))
+ (catch (re-find #"Broken pipe" (.getMessage %)) ex
+ (assoc op :type :info, :error ::broken-pipe))
+ (catch (re-find #"Connection refused" (.getMessage %)) ex
+ (assoc op :type :info, :error ::connection-refused))))
+ (teardown! [this test])
+ (close! [this test]))
+
+(defn set-read-after-write
+ "Read-after-Write checker for set operations"
+ []
+ (reify checker/Checker
+ (check [this test history opts]
+ (let [init {:add-started #{}
+ :add-done #{}
+ :read-must-contain {}
+ :missed #{}
+ :unexpected #{}}
+ final (reduce
+ (fn [state op]
+ (case [(:type op) (:f op)]
+ ([:invoke :add])
+ (assoc state :add-started (conj (:add-started state) (:value op)))
+ ([:ok :add])
+ (assoc state :add-done (conj (:add-done state) (:value op)))
+ ([:invoke :read])
+ (assoc-in state [:read-must-contain (:process op)] (:add-done state))
+ ([:ok :read])
+ (let [read-must-contain (get (:read-must-contain state) (:process op))
+ new-missed (set/difference read-must-contain (:value op))
+ new-unexpected (set/difference (:value op) (:add-started state))]
+ (assoc state
+ :read-must-contain (dissoc (:read-must-contain state) (:process op))
+ :missed (set/union (:missed state) new-missed),
+ :unexpected (set/union (:unexpected state) new-unexpected)))
+ state))
+ init history)
+ valid? (and (empty? (:missed final)) (empty? (:unexpected final)))]
+ (assoc final :valid? valid?)))))
+
+(defn workload1
+ "Tests insertions and deletions"
+ [opts]
+ {:client (SetClient. nil)
+ :checker (independent/checker
+ (checker/compose
+ {:set (checker/set)
+ :timeline (timeline/html)}))
+ :generator (independent/concurrent-generator
+ 10
+ (range 100)
+ (fn [k]
+ (->> (range)
+ (map (fn [x] {:type :invoke, :f :add, :value x}))
+ (gen/limit (:ops-per-key opts)))))
+ :final-generator (independent/concurrent-generator
+ 10
+ (range 100)
+ (fn [k]
+ (gen/phases
+ (gen/once op-read)
+ (gen/sleep 5))))})
+
+(defn workload2
+ "Tests insertions and deletions"
+ [opts]
+ {:client (SetClient. nil)
+ :checker (independent/checker
+ (checker/compose
+ {:set-read-after-write (set-read-after-write)
+ ; :set-full (checker/set-full {:linearizable? false})
+ :timeline (timeline/html)}))
+ :generator (independent/concurrent-generator
+ 10
+ (range)
+ (fn [k]
+ (->> (gen/mix [op-add-rand100 op-read])
+ (gen/limit (:ops-per-key opts)))))})
+
+