(ns jepsen.garage.reg
  (:require [clojure.tools.logging :refer :all]
            [clojure.string :as str]
            [clojure.set :as set]
            [jepsen [checker :as checker]
             [cli :as cli]
             [client :as client]
             [control :as c]
             [db :as db]
             [generator :as gen]
             [independent :as independent]
             [nemesis :as nemesis]
             [util :as util]
             [tests :as tests]]
            [jepsen.checker.timeline :as timeline]
            [jepsen.control.util :as cu]
            [jepsen.os.debian :as debian]
            [jepsen.garage.daemon :as grg]
            [jepsen.garage.s3api :as s3]
            [knossos.model :as model]
            [slingshot.slingshot :refer [try+]]))

(defn op-get [_ _] {:type :invoke, :f :read, :value nil})
(defn op-put [_ _] {:type :invoke, :f :write, :value (str (rand-int 99))})
(defn op-del [_ _] {:type :invoke, :f :write, :value nil})

(defrecord RegClient [creds]
  client/Client
  (open! [this test node]
    (assoc this :creds (grg/creds node)))
  (setup! [this test])
  (invoke! [this test op]
    (try+
      (let [[k v] (:value op)]
        (case (:f op)
          :read
            (util/timeout
              10000
              (assoc op :type :fail, :error ::timeout)
              (let [value (s3/get (:creds this) k)]
                (assoc op :type :ok, :value (independent/tuple k value))))
          :write
            (util/timeout
              10000
              (assoc op :type :info, :error ::timeout)
              (do
                (s3/put (:creds this) k v)
                (assoc op :type :ok)))))
      (catch (re-find #"Unavailable" (.getMessage %)) ex
        (assoc op :type :info, :error ::unavailable))
      (catch (re-find #"Broken pipe" (.getMessage %)) ex
        (assoc op :type :info, :error ::broken-pipe))
      (catch (re-find #"Connection refused" (.getMessage %)) ex
        (assoc op :type :info, :error ::connection-refused))))
  (teardown! [this test])
  (close! [this test]))

(defn reg-read-after-write
  "Read-after-Write checker for register operations"
  []
  (reify checker/Checker
    (check [this test history opts]
      (let [init {:put-values {-1 nil}
                  :put-done #{-1}
                  :put-in-progress {}
                  :read-can-contain {}
                  :bad-reads #{}}
            final (reduce
                    (fn [state op]
                      (let [current-values (set/union
                                             (set (map (fn [idx] (get (:put-values state) idx)) (:put-done state)))
                                             (set (map (fn [[_ [idx _]]] (get (:put-values state) idx)) (:put-in-progress state))))
                            read-can-contain (reduce
                                               (fn [rcc [idx v]] (assoc rcc idx (set/union current-values v)))
                                               {} (:read-can-contain state))]
                      (info "--------")
                      (info "state: " state)
                      (info "current-values: " current-values)
                      (info "read-can-contain: " read-can-contain)
                      (info "op: " op)
                      (case [(:type op) (:f op)]
                        ([:invoke :write])
                          (assoc state
                                 :read-can-contain read-can-contain
                                 :put-values (assoc (:put-values state) (:index op) (:value op))
                                 :put-in-progress (assoc (:put-in-progress state) (:process op) [(:index op) (:put-done state)]))
                        ([:ok :write])
                          (let [[index overwrites] (get (:put-in-progress state) (:process op))]
                            (assoc state
                                   :read-can-contain read-can-contain
                                   :put-in-progress (dissoc (:put-in-progress state) (:process op))
                                   :put-done
                                     (conj
                                       (set/difference (:put-done state) overwrites)
                                       index)))
                        ([:invoke :read])
                         (assoc state
                                :read-can-contain (assoc read-can-contain (:process op) current-values))
                        ([:ok :read])
                         (let [this-read-can-contain (get read-can-contain (:process op))
                               bad-reads (if (contains? this-read-can-contain (:value op))
                                           (:bad-reads state)
                                           (conj (:bad-reads state) [(:process op) (:index op) (:value op) this-read-can-contain]))]
                           (info "this-read-can-contain: " this-read-can-contain)
                           (assoc state
                                  :read-can-contain (dissoc read-can-contain (:process op))
                                  :bad-reads bad-reads))
                        state)))
                    init history)
            valid? (empty? (:bad-reads final))]
        (assoc final :valid? valid?)))))

(defn workload-common
  "Common parts of workload"
  [opts]
  {:client           (RegClient. nil)
   :generator        (independent/concurrent-generator
                       10
                       (range)
                       (fn [k]
                         (->>
                           (gen/mix [op-get op-put op-del])
                           (gen/limit (:ops-per-key opts)))))})

(defn workload1
  "Tests linearizable reads and writes"
  [opts]
  (assoc (workload-common opts)
         :checker (independent/checker
                    (checker/compose
                      {:linear (checker/linearizable
                                 {:model     (model/register)
                                  :algorithm :linear})
                       :timeline (timeline/html)}))))

(defn workload2
  "Tests CRDT reads and writes"
  [opts]
  (assoc (workload-common opts)
         :checker (independent/checker
                    (checker/compose
                      {:reg-read-after-write (reg-read-after-write)
                       :timeline (timeline/html)}))))