Examples¶
These scripts are examples we use in the tests.
You can find them in https://github.com/tarekziade/molotov/tree/master/molotov/tests
"""
This Molotov script has:
- a global setup fixture that sets a global headers dict
- an init worker fixture that sets the session headers
- 3 scenario
- 2 tear downs fixtures
"""
import json
from molotov import (
scenario,
setup,
global_setup,
global_teardown,
teardown,
get_context,
)
_API = "http://localhost:8080"
_HEADERS = {}
# notice that the global setup, global teardown and teardown
# are not a coroutine.
@global_setup()
def init_test(args):
_HEADERS["SomeHeader"] = "1"
@global_teardown()
def end_test():
print("This is the end")
@setup()
async def init_worker(worker_num, args):
headers = {"AnotherHeader": "1"}
headers.update(_HEADERS)
return {"headers": headers}
@teardown()
def end_worker(worker_num):
print("This is the end for %d" % worker_num)
# @scenario(weight=40)
async def scenario_one(session):
async with session.get(_API) as resp:
if get_context(session).statsd:
get_context(session).statsd.incr("BLEH")
res = await resp.json()
assert res["result"] == "OK"
assert resp.status == 200
@scenario(weight=30)
async def scenario_two(session):
async with session.get(_API) as resp:
assert resp.status == 200
# @scenario(weight=30)
async def scenario_three(session):
somedata = json.dumps({"OK": 1})
async with session.post(_API, data=somedata) as resp:
assert resp.status == 200
"""
This Molotov script has 2 scenario
"""
from molotov import scenario
_API = "http://localhost:8080"
@scenario(weight=40)
async def scenario_one(session):
async with session.get(_API) as resp:
res = await resp.json()
assert res["result"] == "OK"
assert resp.status == 200
@scenario(weight=60)
async def scenario_two(session):
async with session.get(_API) as resp:
assert resp.status == 200
"""
This Molotov script has:
- a global setup fixture that sets variables
- an init worker fixture that sets the session headers
- an init session that attachs an object to the current session
- 1 scenario
- 2 tear downs fixtures
"""
import molotov
class SomeObject(object):
"""Does something smart in real life with the async loop."""
def __init__(self, loop):
self.loop = loop
def cleanup(self):
pass
@molotov.global_setup()
def init_test(args):
molotov.set_var("SomeHeader", "1")
molotov.set_var("endpoint", "http://localhost:8080")
@molotov.setup()
async def init_worker(worker_num, args):
headers = {"AnotherHeader": "1", "SomeHeader": molotov.get_var("SomeHeader")}
return {"headers": headers}
@molotov.setup_session()
async def init_session(worker_num, session):
molotov.get_context(session).attach("ob", SomeObject(loop=session.loop))
@molotov.scenario(100)
async def scenario_one(session):
endpoint = molotov.get_var("endpoint")
async with session.get(endpoint) as resp:
res = await resp.json()
assert res["result"] == "OK"
assert resp.status == 200
@molotov.teardown_session()
async def end_session(worker_num, session):
molotov.get_context(session).ob.cleanup()
@molotov.teardown()
def end_worker(worker_num):
print("This is the end for %d" % worker_num)
@molotov.global_teardown()
def end_test():
print("This is the end of the test.")
"""
This Molotov script demonstrates how to hook events.
"""
import molotov
@molotov.events()
async def print_request(event, **info):
if event == "sending_request":
print("=>")
@molotov.events()
async def print_response(event, **info):
if event == "response_received":
print("<=")
@molotov.scenario(100)
async def scenario_one(session):
async with session.get("http://localhost:8080") as resp:
res = await resp.json()
assert res["result"] == "OK"
assert resp.status == 200
"""
This Molotov script show how you can print
the average response time.
"""
import molotov
import time
_T = {}
def _now():
return time.time() * 1000
@molotov.events()
async def record_time(event, **info):
req = info.get("request")
if event == "sending_request":
_T[req] = _now()
elif event == "response_received":
_T[req] = _now() - _T[req]
@molotov.global_teardown()
def display_average():
average = sum(_T.values()) / len(_T)
print("\nAverage response time %dms" % average)
"""
This Molotov script uses events to display concurrency info
"""
import molotov
import time
concurs = [] # [(timestamp, worker count)]
def _now():
return time.time() * 1000
@molotov.events()
async def record_time(event, **info):
if event == "current_workers":
concurs.append((_now(), info["workers"]))
@molotov.global_teardown()
def display_average():
print("\nconcurrencies: %s", concurs)
delta = max(ts for ts, _ in concurs) - min(ts for ts, _ in concurs)
average = sum(value for _, value in concurs) * 1000 / delta
print("\nAverage concurrency: %.2f VU/s" % average)
"""
This Molotov script uses events to generate a success/failure output
"""
import json
import molotov
import time
_T = {}
def _now():
return time.time() * 1000
@molotov.events()
async def record_time(event, **info):
if event == "scenario_start":
scenario = info["scenario"]
index = (info["wid"], scenario["name"])
_T[index] = _now()
if event == "scenario_success":
scenario = info["scenario"]
index = (info["wid"], scenario["name"])
start_time = _T.pop(index, None)
duration = int(_now() - start_time)
if start_time is not None:
print(
json.dumps(
{
"ts": time.time(),
"type": "scenario_success",
"name": scenario["name"],
"duration": duration,
}
)
)
elif event == "scenario_failure":
scenario = info["scenario"]
exception = info["exception"]
index = (info["wid"], scenario["name"])
start_time = _T.pop(index, None)
duration = int(_now() - start_time)
if start_time is not None:
print(
json.dumps(
{
"ts": time.time(),
"type": "scenario_failure",
"name": scenario["name"],
"exception": exception.__class__.__name__,
"errorMessage": str(exception),
"duration": duration,
}
)
)