#!/bin/ash
#
# Wrapper script around wrk service mesh benchmark
#  for data export to Prometheus push GW
#

conn=10
rps=10
dur=60
init_delay=0
pgw="http://pushgateway.monitoring:9091/metrics/job/wrk2_benchmark/instance/test_run"

usage() {
    echo "Usage:"
    echo "  docker run -ti quay.io/kinvolk/wrk2-srvmesh-bench \\"
    echo "        [-c <num-of-concurrent-connections>] \\"
    echo "        [-r <num-of-requests-per-second>] \\"
    echo "        [-d <duration>] \\"
    echo "        [-p <push-gateway>] \\"
    echo "     http[s]://>service-1>/<endpoint-1> \\"
    echo "     http[s]://>service-1>/<endpoint-2> \\"
    echo "     http[s]://>service-2>/<endpoint-1> \\"
    echo "     http[s]://>service-2>/<endpoint-2> \\"
    echo "        ..."
    echo
    echo " -c <connections> - Number of concurrent connections. Default: $conn"
    echo " -r <rps> - Target rate of requests per second. Default: $rps"
    echo " -d <duration> - Test duration in seconds. Default: $dur"
    echo " -i <duration> - Initial delay (sleep time before start) in seconds"
    echo "                 Default: $init_delay"
    echo " -p <push-gateway> - URL of prometheus push gateway. Default: $pgw"
    echo "                     Use 'stdout' to just print to standard output."
    echo "                     Use 'null' to suppress output (for debugging)."
    echo
}

# --

[ -z "$1" -o "help" = "$1" -o "-h" = "$1" -o "--help" = "$1" ] && {
    usage
    exit 0
}

servers=""

i=1; next=""
for arg do
    [ "$arg" = "-c" ] &&  { next="conn"; continue; }
    [ "$arg" = "-r" ] &&  { next="rps"; continue; }
    [ "$arg" = "-d" ] &&  { next="dur"; continue; }
    [ "$arg" = "-p" ] &&  { next="pgw"; continue; }
    [ "$arg" = "-i" ] &&  { next="init_delay"; continue; }

    [ -n "$next" ] && { eval $next="$arg"; next=""; continue; }

    servers="$servers $arg"
done

[ $init_delay -ne 0 ] && {
    echo "Init delay: sleeping for $init_delay seconds."
    sleep "$init_delay"
    echo "Slept well, now starting benchmark"
}

# set up CURL command to push to Prometheus
if [ "$pgw" = "stdout" ] ; then
    curl_pgw="cat -"
elif [ "$pgw" = "null" ] ; then
    curl_pgw="true"
else
    # Add RPS to URL
    curl_pgw="curl -s --data-binary @- $pgw/rps/$rps"

    # check if gateway is available, retry a few times
    n=0
    until [ "$n" -ge 10 ]
    do
        echo -e "# TYPE wrk2_benchmark_progress counter\nwrk2_benchmark_progress 0" \
            | $curl_pgw && break
        n=$((n+1))
        sleep 2
        echo "retrying POSTing to prometheus push gateway..."
    done

    echo -e "# TYPE wrk2_benchmark_progress counter\nwrk2_benchmark_progress 0" \
        | $curl_pgw

    if [ $? -ne 0 ]; then
        echo
        echo "ERROR posting to prometheus push gateway $pgw"
        echo
        usage
        echo
        echo "Aborting due to errors."
        exit 1
    fi
fi
export curl_pgw

duration_mul=$(echo "$dur" \
                   | awk '/s$/{print "1"} /m$/{print "60"} /h$/{print "3600"}')
[ -z "$duration_mul" ] && duration_mul=1
duration_val=$(echo "$dur" | sed 's/[^0-9]\+//g')
duration_s=$(( $duration_val * $duration_mul ))

echo
echo "Running stresser with:"
echo "   conn: $conn"
echo "   rps: $rps"
echo "   duration: $dur ($duration_s s)"
echo "   prometheus push-gateway: $pgw"
echo "   servers: $servers"
echo

sleep 1

# we use files to exchange per-thread matrics; use ram-backed fs for this file
# I/O to prevent threads being stuck in IOWAIT
cd /tmpfs

# ---------------------

prometheus_pusher() {
    local num_threads="$1"
    local req_rps="$2"
    local duration="$3"
    local curl_pgw="$4"

    local iter=0
    local init=1

    local start_ts=""
    local last_load_report=$(date +%s)

    while [ ! -f "benchmark-end.txt" ] ; do
        now=$(date +%s)

        mcount=$(ls -1 thread-*_seq-${iter}.txt 2>/dev/null | wc -l)

        # wrk2 still initialising?
        [ $init -eq 1 ] && {
            grep -E \
                  '^Initialised [0-9]+ threads in [0-9]+ ms.' stats 2>/dev/null
            if [ $? -ne 0 ] ; then
                init_progress=$(($mcount * 100 / num_threads))
                cat << EOF | $curl_pgw
# TYPE wrk2_benchmark_progress counter
wrk2_benchmark_progress{status="init"} $init_progress
EOF
                echo "INIT Progress: $init_progress% , ${mcount} / ${num_threads} threads"
                sleep 1
                continue
            else
                init=0
                cat << EOF | $curl_pgw
# TYPE wrk2_benchmark_progress counter
wrk2_benchmark_progress{status="init"} 100
wrk2_benchmark_progress{status="run"} 0
EOF
                start_ts=$now
            fi
        }

        # wait until all threads reported for this iteration
        [ $mcount -lt $num_threads ] && {
            sleep 0.01 # yield CPU
            continue; }

        runtime=$(($now - $start_ts))
        progress=$(($runtime * 100 / $duration))
        [ $progress -gt 100 ] && progress=100

        # runtime info
        cat << EOF | $curl_pgw
# TYPE wrk2_benchmark_requests counter
# TYPE wrk2_benchmark_responses counter
# TYPE wrk2_benchmark_average_rps gauge
# TYPE wrk2_benchmark_current_rps gauge
# TYPE wrk2_benchmark_average_tcp_reconnect_rate gauge
# TYPE wrk2_benchmark_current_tcp_reconnect_rate gauge
# TYPE wrk2_benchmark_rgauge
# TYPE wrk2_benchmark_progress counter
# TYPE wrk2_benchmark_thread_count counter
# TYPE wrk2_benchmark_duration counter
wrk2_benchmark_progress{status="init"} 100
wrk2_benchmark_progress{status="run"} $progress
wrk2_benchmark_thread_count $num_threads
wrk2_benchmark_duration $duration
$(cat thread-*_seq-${iter}.txt | grep -v 'wrk2_benchmark_run_')
EOF

        # Results info, pushed after run concluded
        cat << EOF > wrk2_benchmark_run.txt
# TYPE wrk2_benchmark_run_average_tcp_reconnect_rate gauge
$(cat thread-*_seq-${iter}.txt | grep 'wrk2_benchmark_run_')
$(echo "$loadavg" | sed 's/wrk2_benchmark_node_/wrk2_benchmark_run_node_/')
EOF

        echo "RUN Progress: $progress% , ${runtime}s / ${duration}s"
        rm thread-*_seq-${iter}.txt
        iter=$(($iter+1))
    done

    echo "All done!"
}

# ---------------------

start_ts=$(date +%s)
rm -f thread-*_seq-*.txt "benchmark-end.txt"

prometheus_pusher "$conn" "$rps" "$duration_s" "$curl_pgw" &

# split off first endpoint, generate LUA include file with all endpoints
# to spped up thread start-up when many endpoints are defined
first_server=$(echo $servers | sed 's/ .*//g')
count=0
echo 'input_endpoints={}' > endpoints.lua
for e in $servers; do
    echo "input_endpoints[$count] = \"$e\" " >> endpoints.lua
    count=$((count+1))
done

# wait until first endpoint becomes available.
echo "Waiting for '$first_server' to become available"
timeout=120
st=$(date +%s)
ts="$st"
while [ $((st + timeout)) -ge $ts ]; do
    echo "    Trying $first_server (for $((st+timeout-ts)) more seconds) ..."
    curl -s -m 5 "$first_server" >/dev/null && break
    ts=$(date +%s)
    sleep 1
done
echo "'$first_server' responded, starting benchmark."
echo "----"
echo

unbuffer /usr/local/bin/wrk \
              -s /usr/local/bin/multiple-endpoints-prometheus-metrics.lua \
              --lua-dont-pass-body \
              -L -R "$rps" -c "$conn" -t "$conn" -d "$dur" \
              $first_server \
    | unbuffer -p tee stats

> "benchmark-end.txt"

sleep 2
kill -9 %1
wait

echo ""

# ---------------------

end_ts=$(date +%s)

# set sum gauges to 0, progress to 100, update wrk2_benchmark_run metrics
per_thread_info=""
for i in $(seq $conn); do
    per_thread_info="wrk2_benchmark_requests{label=\"thread-${i}\"} 0\n$per_thread_info"
    per_thread_info="wrk2_benchmark_responses{label=\"thread-${i}\"} 0\n$per_thread_info"
    per_thread_info="wrk2_benchmark_average_rps{label=\"thread-${i}\"} 0\n$per_thread_info"
    per_thread_info="wrk2_benchmark_current_rps{label=\"thread-${i}\"} 0\n$per_thread_info"
    per_thread_info="wrk2_benchmark_average_tcp_reconnect_rate{label=\"thread-${i}\"} 0\n$per_thread_info"
    per_thread_info="wrk2_benchmark_current_tcp_reconnect_rate{label=\"thread-${i}\"} 0\n$per_thread_info"
done

cat << EOF | $curl_pgw
# TYPE wrk2_benchmark_requests counter
# TYPE wrk2_benchmark_responses counter
# TYPE wrk2_benchmark_average_rps gauge
# TYPE wrk2_benchmark_current_rps gauge
# TYPE wrk2_benchmark_average_tcp_reconnect_rate gauge
# TYPE wrk2_benchmark_current_tcp_reconnect_rate gauge
# TYPE wrk2_benchmark_duration counter
$(echo -e $per_thread_info)
wrk2_benchmark_duration 0

# TYPE wrk2_benchmark_run_requested_rps gauge
# TYPE wrk2_benchmark_run_thread_count counter
wrk2_benchmark_run_requested_rps $rps
wrk2_benchmark_run_thread_count $conn
$(cat wrk2_benchmark_run.txt)

# TYPE wrk2_benchmark_run_runtime gauge
wrk2_benchmark_run_runtime{kind="start"} $start_ts
wrk2_benchmark_run_runtime{kind="end"} $end_ts
wrk2_benchmark_run_runtime{kind="duration"} $duration_s
# TYPE wrk2_benchmark_progress counter
wrk2_benchmark_progress{status="init"} 100
wrk2_benchmark_progress{status="run"} 100
wrk2_benchmark_progress{status="done"} 100
EOF

# send final wrk2 stats
awk '
    # ---------------------------

    # simple latency spectrum
    /^[ 0-9.]+%/ {
        if (latency) {
            percentile = gensub(/%.*/, "", "g", $1) / 100
            val  = gensub(/([0-9.]+).*/,"\\1","g",$2)
            unit = gensub(/[0-9.]+(.*)/,"\\1","g",$2)

            mul = 1
            if (unit == "us") mul=0.0001
            if (unit == "s")  mul=1000
            if (unit == "m")  mul=60000

            print "wrk2_benchmark_latency_ms{p=\"" percentile "\"} " val*mul
        }
    }

    /Latency Distribution/ {
        print "# TYPE wrk2_benchmark_latency_ms counter"
        latency=1
    }

    # detailed latency spectrum
    /^#/ { detailed=0 }

    {
        if (detailed) {
            detailed = detailed + 1
            if (detailed >= 3 && $2 != prev_perc) {
                print "wrk2_benchmark_latency_detailed_ms{p=\"" $2 "\"} " $1
                prev_perc=$2
            }
        }
    }

    /Detailed Percentile spectrum:/ {
        latency=0
        print "# TYPE wrk2_benchmark_latency_detailed_ms counter"
        detailed=1
    }


    # overall RPS
    /Requests\/sec:/ {
        print "# TYPE wrk2_benchmark_run_rps counter"
        print "wrk2_benchmark_run_rps " $2
    }

    # overall requests
    /Total Requests:/{
        print "# TYPE wrk2_benchmark_run_requests counter"
        print "wrk2_benchmark_run_requests " $3
    }

    # socket error stats - not present if there are no errors
    #   Socket errors: connect 0, read 25, write 0, timeout 0
    { s_connect=0; s_read=0; s_write=0; s_timeout=0; }
    /Socket errors:/ {
        s_connect = gensub(/,/,"","g",$4)
        s_read    = gensub(/,/,"","g",$6)
        s_write   = gensub(/,/,"","g",$8)
        s_timeout = $10
    }

    #   HTTP errors: 0
    /HTTP errors:/ {
        print "# TYPE wrk2_benchmark_http_errors counter"
        print "wrk2_benchmark_http_errors " $3
    }

    #   Requests timed out: 0
    /Requests timed out:/ {
        print "# TYPE wrk2_benchmark_requests_timed_out counter"
        print "wrk2_benchmark_requests_timed_out " $4
    }

    #   Bytes received: 1519608191
    /Bytes received:/ {
        print "# TYPE wrk2_benchmark_bytes_read counter"
        print "wrk2_benchmark_bytes_read " $3
    }

    #    http://192.168.1.244/index.txt : 5397500
    {
        if (call_count)
            url_call_count[$1] = url_call_count[$1] + $3
    }

    # Call count of URLs
    /URL call count/ {
        call_count=1
    }

    END {
        print "# TYPE wrk2_benchmark_url_call_count counter"
        for (url in url_call_count)
            print "wrk2_benchmark_url_call_count{url=\"" url "\"} " \
                                                            url_call_count[url]
        print "# TYPE wrk2_benchmark_socket_errors counter"
        print "wrk2_benchmark_socket_errors{t=\"connect\"} " s_connect
        print "wrk2_benchmark_socket_errors{t=\"read\"} " s_read
        print "wrk2_benchmark_socket_errors{t=\"write\"} " s_write
        print "wrk2_benchmark_socket_errors{t=\"timeout\"} " s_timeout
    }

    # ---------------------------
    ' stats \
 | $curl_pgw


# gracefully shut down istio/linkerd sidecar if one exists
sidecar=$(ps ax  | grep -v grep | grep -oE '(linkerd2-proxy|pilot-agent)')
[ -z "$sidecar" ] && exit 0

grace_max=10
grace="$grace_max"
while true; do
    pkill "$sidecar"
    if [ $? -eq 0 ]; then
        echo "Sent SIGTERM to $sidecar"
        sleep 1
        grace="$grace_max"
        continue
    fi

    if [ $grace -gt 0 ]; then
        sleep 1
        echo "Proxy $sidecar shutdown grace period: $grace"
        grace=$(($grace-1))
        continue
    fi
    break
done

# Rule #2: Double Tap
pkill -9 "$sidecar"

# exit successfully
true
