aboutsummaryrefslogtreecommitdiff
path: root/script/jepsen.garage/src/jepsen/garage/set.clj
blob: c538746230da4cc25fb1c92f6fdf504bfac53534 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
(ns jepsen.garage.set
  (:require [clojure.tools.logging :refer :all]
            [clojure.string :as str]
            [clojure.set :as set]
            [jepsen [checker :as checker]
             [cli :as cli]
             [client :as client]
             [control :as c]
             [checker :as checker]
             [db :as db]
             [generator :as gen]
             [independent :as independent]
             [nemesis :as nemesis]
             [util :as util]
             [tests :as tests]]
            [jepsen.checker.timeline :as timeline]
            [jepsen.control.util :as cu]
            [jepsen.os.debian :as debian]
            [jepsen.garage.daemon :as grg]
            [jepsen.garage.s3api :as s3]
            [knossos.model :as model]
            [slingshot.slingshot :refer [try+]]))

(defn op-add-rand100 [_ _] {:type :invoke, :f :add, :value (rand-int 100)})
(defn op-read [_ _] {:type :invoke, :f :read, :value nil})

(defrecord SetClient [creds]
  client/Client
  (open! [this test node]
    (assoc this :creds (grg/creds node)))
  (setup! [this test])
  (invoke! [this test op]
    (let [[k v] (:value op)
          prefix (str "set" k "/")]
      (case (:f op)
        :add
          (util/timeout
            10000
            (assoc op :type :info, :error ::timeout)
            (do
              (s3/put (:creds this) (str prefix v) "present")
              (assoc op :type :ok)))
        :read
          (util/timeout
            10000
            (assoc op :type :fail, :error ::timeout)
            (do
              (info "call s3/list creds: " (:creds this) ", prefix:" prefix)
              (let [items (s3/list (:creds this) prefix)]
                (info "list results for prefix" prefix ":" items " (node:" (:endpoint (:creds this)) ")")
                (let [items-stripped (map (fn [o]
                                          (assert (str/starts-with? o prefix))
                                          (str/replace-first o prefix "")) items)
                      items-set (set (map parse-long items-stripped))]
                  (assoc op :type :ok, :value (independent/tuple k items-set)))))))))
  (teardown! [this test])
  (close! [this test]))

(defn set-read-after-write
  "Read-after-Write checker for set operations"
  []
  (reify checker/Checker
    (check [this test history opts]
      (let [init {:add-started #{}
                  :add-done #{}
                  :read-must-contain {}
                  :missed #{}
                  :unexpected #{}}
            final (reduce
                    (fn [state op]
                      (case [(:type op) (:f op)]
                        ([:invoke :add])
                          (assoc state :add-started (conj (:add-started state) (:value op)))
                        ([:ok :add])
                          (assoc state :add-done (conj (:add-done state) (:value op)))
                        ([:invoke :read])
                          (assoc-in state [:read-must-contain (:process op)] (:add-done state))
                        ([:ok :read])
                          (let [read-must-contain (get (:process op) (:read-must-contain state))
                                new-missed (set/difference read-must-contain (:value op))
                                new-unexpected (set/difference (:value op) (:add-started state))]
                            (assoc state
                                   :read-must-contain (dissoc (:read-must-contain state) (:process op))
                                   :missed (set/union (:missed state) new-missed),
                                   :unexpected (set/union (:unexpected state) new-unexpected)))
                         state))
                    init history)
            valid? (and (empty? (:missed final)) (empty? (:unexpected final)))]
        (assoc final :valid? valid?)))))

(defn workload1
  "Tests insertions and deletions"
  [opts]
  {:client            (SetClient. nil)
   :checker           (independent/checker
                        (checker/compose
                          {:set (checker/set)
                           :timeline (timeline/html)}))
   :generator         (independent/concurrent-generator
                        10
                        (range 100)
                        (fn [k]
                          (->> (range)
                           (map (fn [x] {:type :invoke, :f :add, :value x}))
                           (gen/limit (:ops-per-key opts)))))
   :final-generator   (gen/phases
                        (independent/sequential-generator
                          (range 100)
                          (fn [k] (gen/once op-read)))
                        (gen/sleep 5))})

(defn workload2
  "Tests insertions and deletions"
  [opts]
  {:client            (SetClient. nil)
   :checker           (independent/checker
                        (checker/compose
                          {:set-full (checker/set-full {:linearizable? false})
                           :set-read-after-write (set-read-after-write)
                           :timeline (timeline/html)}))
   :generator         (independent/concurrent-generator
                        10
                        (range)
                        (fn [k]
                          (->> (gen/mix [op-add-rand100 op-read])
                               (gen/limit (:ops-per-key opts)))))})