aboutsummaryrefslogtreecommitdiff
path: root/script/jepsen.garage/src/jepsen/garage/reg.clj
blob: 39708c0b1ff7257225a4f9225a2c6127aff70f6b (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
(ns jepsen.garage.reg
  (:require [clojure.tools.logging :refer :all]
            [clojure.string :as str]
            [clojure.set :as set]
            [jepsen [checker :as checker]
             [cli :as cli]
             [client :as client]
             [control :as c]
             [db :as db]
             [generator :as gen]
             [independent :as independent]
             [nemesis :as nemesis]
             [util :as util]
             [tests :as tests]]
            [jepsen.checker.timeline :as timeline]
            [jepsen.control.util :as cu]
            [jepsen.os.debian :as debian]
            [jepsen.garage.daemon :as grg]
            [jepsen.garage.s3api :as s3]
            [knossos.model :as model]
            [slingshot.slingshot :refer [try+]]))

(defn op-get [_ _] {:type :invoke, :f :read, :value nil})
(defn op-put [_ _] {:type :invoke, :f :write, :value (str (rand-int 99))})
(defn op-del [_ _] {:type :invoke, :f :write, :value nil})

(defrecord RegClient [creds]
  client/Client
  (open! [this test node]
    (assoc this :creds (grg/creds node)))
  (setup! [this test])
  (invoke! [this test op]
    (try+
      (let [[k v] (:value op)]
        (case (:f op)
          :read
            (util/timeout
              10000
              (assoc op :type :fail, :error ::timeout)
              (let [value (s3/get (:creds this) k)]
                (assoc op :type :ok, :value (independent/tuple k value))))
          :write
            (util/timeout
              10000
              (assoc op :type :info, :error ::timeout)
              (do
                (s3/put (:creds this) k v)
                (assoc op :type :ok)))))
      (catch (re-find #"Unavailable" (.getMessage %)) ex
        (assoc op :type :info, :error ::unavailable))
      (catch (re-find #"Broken pipe" (.getMessage %)) ex
        (assoc op :type :info, :error ::broken-pipe))
      (catch (re-find #"Connection refused" (.getMessage %)) ex
        (assoc op :type :info, :error ::connection-refused))))
  (teardown! [this test])
  (close! [this test]))

(defn reg-read-after-write
  "Read-after-Write checker for register operations"
  []
  (reify checker/Checker
    (check [this test history opts]
      (let [init {:put-values {-1 nil}
                  :put-done #{-1}
                  :put-in-progress {}
                  :read-can-contain {}
                  :bad-reads #{}}
            final (reduce
                    (fn [state op]
                      (let [current-values (set/union
                                             (set (map (fn [idx] (get (:put-values state) idx)) (:put-done state)))
                                             (set (map (fn [[_ [idx _]]] (get (:put-values state) idx)) (:put-in-progress state))))
                            read-can-contain (reduce
                                               (fn [rcc [idx v]] (assoc rcc idx (set/union current-values v)))
                                               {} (:read-can-contain state))]
                      (info "--------")
                      (info "state: " state)
                      (info "current-values: " current-values)
                      (info "read-can-contain: " read-can-contain)
                      (info "op: " op)
                      (case [(:type op) (:f op)]
                        ([:invoke :write])
                          (assoc state
                                 :read-can-contain read-can-contain
                                 :put-values (assoc (:put-values state) (:index op) (:value op))
                                 :put-in-progress (assoc (:put-in-progress state) (:process op) [(:index op) (:put-done state)]))
                        ([:ok :write])
                          (let [[index overwrites] (get (:put-in-progress state) (:process op))]
                            (assoc state
                                   :read-can-contain read-can-contain
                                   :put-in-progress (dissoc (:put-in-progress state) (:process op))
                                   :put-done
                                     (conj
                                       (set/difference (:put-done state) overwrites)
                                       index)))
                        ([:invoke :read])
                         (assoc state
                                :read-can-contain (assoc read-can-contain (:process op) current-values))
                        ([:ok :read])
                         (let [this-read-can-contain (get read-can-contain (:process op))
                               bad-reads (if (contains? this-read-can-contain (:value op))
                                           (:bad-reads state)
                                           (conj (:bad-reads state) [(:process op) (:index op) (:value op) this-read-can-contain]))]
                           (info "this-read-can-contain: " this-read-can-contain)
                           (assoc state
                                  :read-can-contain (dissoc read-can-contain (:process op))
                                  :bad-reads bad-reads))
                        state)))
                    init history)
            valid? (empty? (:bad-reads final))]
        (assoc final :valid? valid?)))))

(defn workload-common
  "Common parts of workload"
  [opts]
  {:client           (RegClient. nil)
   :generator        (independent/concurrent-generator
                       10
                       (range)
                       (fn [k]
                         (->>
                           (gen/mix [op-get op-put op-del])
                           (gen/limit (:ops-per-key opts)))))})

(defn workload1
  "Tests linearizable reads and writes"
  [opts]
  (assoc (workload-common opts)
         :checker (independent/checker
                    (checker/compose
                      {:linear (checker/linearizable
                                 {:model     (model/register)
                                  :algorithm :linear})
                       :timeline (timeline/html)}))))

(defn workload2
  "Tests CRDT reads and writes"
  [opts]
  (assoc (workload-common opts)
         :checker (independent/checker
                    (checker/compose
                      {:reg-read-after-write (reg-read-after-write)
                       :timeline (timeline/html)}))))