Examples

These scripts are examples we use in the tests.

You can find them in https://github.com/tarekziade/molotov/tree/master/molotov/tests

"""

This Molotov script has:

- a global setup fixture that sets a global headers dict
- an init worker fixture that sets the session headers
- 3 scenario
- 2 tear downs fixtures

"""
import json
from molotov import (
    scenario,
    setup,
    global_setup,
    global_teardown,
    teardown,
    get_context,
)


_API = "http://localhost:8080"
_HEADERS = {}


# notice that the global setup, global teardown and teardown
# are not a coroutine.
@global_setup()
def init_test(args):
    _HEADERS["SomeHeader"] = "1"


@global_teardown()
def end_test():
    print("This is the end")


@setup()
async def init_worker(worker_num, args):
    headers = {"AnotherHeader": "1"}
    headers.update(_HEADERS)
    return {"headers": headers}


@teardown()
def end_worker(worker_num):
    print("This is the end for %d" % worker_num)


# @scenario(weight=40)
async def scenario_one(session):
    async with session.get(_API) as resp:
        if get_context(session).statsd:
            get_context(session).statsd.incr("BLEH")
        res = await resp.json()
        assert res["result"] == "OK"
        assert resp.status == 200


@scenario(weight=30)
async def scenario_two(session):
    async with session.get(_API) as resp:
        assert resp.status == 200


# @scenario(weight=30)
async def scenario_three(session):
    somedata = json.dumps({"OK": 1})
    async with session.post(_API, data=somedata) as resp:
        assert resp.status == 200
"""

This Molotov script has 2 scenario

"""
from molotov import scenario


_API = "http://localhost:8080"


@scenario(weight=40)
async def scenario_one(session):
    async with session.get(_API) as resp:
        res = await resp.json()
        assert res["result"] == "OK"
        assert resp.status == 200


@scenario(weight=60)
async def scenario_two(session):
    async with session.get(_API) as resp:
        assert resp.status == 200
"""

This Molotov script has:

- a global setup fixture that sets variables
- an init worker fixture that sets the session headers
- an init session that attachs an object to the current session
- 1 scenario
- 2 tear downs fixtures

"""
import molotov


class SomeObject(object):
    """Does something smart in real life with the async loop."""

    def __init__(self, loop):
        self.loop = loop

    def cleanup(self):
        pass


@molotov.global_setup()
def init_test(args):
    molotov.set_var("SomeHeader", "1")
    molotov.set_var("endpoint", "http://localhost:8080")


@molotov.setup()
async def init_worker(worker_num, args):
    headers = {"AnotherHeader": "1", "SomeHeader": molotov.get_var("SomeHeader")}
    return {"headers": headers}


@molotov.setup_session()
async def init_session(worker_num, session):
    molotov.get_context(session).attach("ob", SomeObject(loop=session.loop))


@molotov.scenario(100)
async def scenario_one(session):
    endpoint = molotov.get_var("endpoint")
    async with session.get(endpoint) as resp:
        res = await resp.json()
        assert res["result"] == "OK"
        assert resp.status == 200


@molotov.teardown_session()
async def end_session(worker_num, session):
    molotov.get_context(session).ob.cleanup()


@molotov.teardown()
def end_worker(worker_num):
    print("This is the end for %d" % worker_num)


@molotov.global_teardown()
def end_test():
    print("This is the end of the test.")
"""

This Molotov script demonstrates how to hook events.

"""

import molotov


@molotov.events()
async def print_request(event, **info):
    if event == "sending_request":
        print("=>")


@molotov.events()
async def print_response(event, **info):
    if event == "response_received":
        print("<=")


@molotov.scenario(100)
async def scenario_one(session):
    async with session.get("http://localhost:8080") as resp:
        res = await resp.json()
        assert res["result"] == "OK"
        assert resp.status == 200
"""

This Molotov script show how you can print
the average response time.

"""
import molotov
import time


_T = {}


def _now():
    return time.time() * 1000


@molotov.events()
async def record_time(event, **info):
    req = info.get("request")
    if event == "sending_request":
        _T[req] = _now()
    elif event == "response_received":
        _T[req] = _now() - _T[req]


@molotov.global_teardown()
def display_average():
    average = sum(_T.values()) / len(_T)
    print("\nAverage response time %dms" % average)
"""

This Molotov script uses events to display concurrency info

"""
import molotov
import time


concurs = []  # [(timestamp, worker count)]


def _now():
    return time.time() * 1000


@molotov.events()
async def record_time(event, **info):
    if event == "current_workers":
        concurs.append((_now(), info["workers"]))


@molotov.global_teardown()
def display_average():
    print("\nconcurrencies: %s", concurs)
    delta = max(ts for ts, _ in concurs) - min(ts for ts, _ in concurs)
    average = sum(value for _, value in concurs) * 1000 / delta
    print("\nAverage concurrency: %.2f VU/s" % average)
"""

This Molotov script uses events to generate a success/failure output

"""
import json
import molotov
import time

_T = {}


def _now():
    return time.time() * 1000


@molotov.events()
async def record_time(event, **info):
    if event == "scenario_start":
        scenario = info["scenario"]
        index = (info["wid"], scenario["name"])
        _T[index] = _now()
    if event == "scenario_success":
        scenario = info["scenario"]
        index = (info["wid"], scenario["name"])
        start_time = _T.pop(index, None)
        duration = int(_now() - start_time)
        if start_time is not None:
            print(
                json.dumps(
                    {
                        "ts": time.time(),
                        "type": "scenario_success",
                        "name": scenario["name"],
                        "duration": duration,
                    }
                )
            )
    elif event == "scenario_failure":
        scenario = info["scenario"]
        exception = info["exception"]
        index = (info["wid"], scenario["name"])
        start_time = _T.pop(index, None)
        duration = int(_now() - start_time)
        if start_time is not None:
            print(
                json.dumps(
                    {
                        "ts": time.time(),
                        "type": "scenario_failure",
                        "name": scenario["name"],
                        "exception": exception.__class__.__name__,
                        "errorMessage": str(exception),
                        "duration": duration,
                    }
                )
            )