#!/usr/bin/env python3 # this is the entry point. run it with: # python3 loadtest.py # # or with a custom scenario: # python3 loadtest.py --scenario scenarios/example.py # # all configuration lives in config.py — go change stuff # there before running this. import argparse import importlib.util import inspect import json import logging import os import statistics import sys import time import traceback from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime, timezone from playwright.sync_api import sync_playwright import config from scenarios.base import BaseScenario # logging setup def setup_logging(): """configure logging based on config.py settings.""" log_level = getattr(logging, config.LOG_LEVEL.upper(), logging.DEBUG) fmt = ( "%(asctime)s | %(levelname)-8s | %(name)-30s | %(message)s" ) datefmt = "%Y-%m-%d %H:%M:%S" handlers = [logging.StreamHandler(sys.stdout)] if config.LOG_TO_FILE: log_dir = os.path.dirname(config.LOG_FILE) if log_dir: os.makedirs(log_dir, exist_ok=True) file_handler = logging.FileHandler(config.LOG_FILE, mode="w") handlers.append(file_handler) logging.basicConfig( level=log_level, format=fmt, datefmt=datefmt, handlers=handlers, ) # quiet down some noisy loggers logging.getLogger("urllib3").setLevel(logging.WARNING) logging.getLogger("asyncio").setLevel(logging.WARNING) logger = logging.getLogger("loadtest") # scenario loader def load_scenario(scenario_path): """ dynamically load a scenario from a .py file path. finds the first class that subclasses BaseScenario. """ logger.info(f"loading scenario from: {scenario_path}") if not os.path.isfile(scenario_path): logger.error(f"scenario file not found: {scenario_path}") sys.exit(1) spec = importlib.util.spec_from_file_location("scenario_module", scenario_path) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) # find the first BaseScenario subclass in the module scenario_class = None for _name, obj in inspect.getmembers(module, inspect.isclass): if issubclass(obj, BaseScenario) and obj is not BaseScenario: scenario_class = obj break if scenario_class is None: logger.error( f"no BaseScenario subclass found in {scenario_path}. " "make sure your scenario class inherits from scenarios.base.BaseScenario." ) sys.exit(1) logger.info(f"loaded scenario: {scenario_class.name} ({scenario_class.__name__})") return scenario_class # single virtual user runner def run_virtual_user(user_id, scenario_class, results_list): """ runs a single virtual user through the scenario. each virtual user gets its own browser context. """ user_logger = logging.getLogger(f"loadtest.user-{user_id:03d}") user_logger.info("virtual user starting up...") user_results = { "user_id": user_id, "iterations": [], "errors": [], } try: with sync_playwright() as p: # launch browser browser_launcher = getattr(p, config.BROWSER_TYPE) browser = browser_launcher.launch(headless=config.HEADLESS) user_logger.info( f"browser launched: {config.BROWSER_TYPE} " f"(headless={config.HEADLESS})" ) context = browser.new_context( base_url=config.BASE_URL, viewport={ "width": config.VIEWPORT_WIDTH, "height": config.VIEWPORT_HEIGHT, }, ) context.set_default_navigation_timeout(config.NAVIGATION_TIMEOUT) context.set_default_timeout(config.ACTION_TIMEOUT) page = context.new_page() # instantiate the scenario scenario = scenario_class() # wire up request/response hooks page.on("response", scenario.on_response) page.on("request", scenario.on_request) user_logger.info( f"starting {config.ITERATIONS_PER_USER} iteration(s)..." ) for iteration in range(1, config.ITERATIONS_PER_USER + 1): iter_logger = logging.getLogger( f"loadtest.user-{user_id:03d}.iter-{iteration}" ) iter_logger.info(f"--- iteration {iteration} start ---") iter_result = { "iteration": iteration, "start_time": None, "end_time": None, "duration_ms": None, "success": False, "error": None, } start = time.perf_counter() iter_result["start_time"] = datetime.now(timezone.utc).isoformat() try: scenario.setup(page) scenario.run(page) scenario.teardown(page) iter_result["success"] = True iter_logger.info("iteration completed successfully.") except Exception as e: iter_result["error"] = str(e) user_results["errors"].append( { "iteration": iteration, "error": str(e), "traceback": traceback.format_exc(), } ) iter_logger.error(f"iteration failed: {e}") iter_logger.debug(traceback.format_exc()) finally: end = time.perf_counter() duration_ms = round((end - start) * 1000, 2) iter_result["end_time"] = datetime.now(timezone.utc).isoformat() iter_result["duration_ms"] = duration_ms iter_logger.info(f"duration: {duration_ms}ms") user_results["iterations"].append(iter_result) # think time between iterations if iteration < config.ITERATIONS_PER_USER and config.THINK_TIME > 0: iter_logger.debug( f"think time: sleeping {config.THINK_TIME}s..." ) time.sleep(config.THINK_TIME) # cleanup page.close() context.close() browser.close() user_logger.info("virtual user finished, browser closed.") except Exception as e: user_logger.error(f"virtual user crashed: {e}") user_logger.debug(traceback.format_exc()) user_results["errors"].append( { "iteration": 0, "error": str(e), "traceback": traceback.format_exc(), } ) results_list.append(user_results) return user_results # results summary generaton def summarize_results(all_results): """crunch the numbers and print a summary.""" total_iterations = 0 successful = 0 failed = 0 durations = [] for user in all_results: for iteration in user["iterations"]: total_iterations += 1 if iteration["success"]: successful += 1 durations.append(iteration["duration_ms"]) else: failed += 1 summary = { "timestamp": datetime.now(timezone.utc).isoformat(), "config": { "base_url": config.BASE_URL, "concurrent_users": config.CONCURRENT_USERS, "iterations_per_user": config.ITERATIONS_PER_USER, "think_time": config.THINK_TIME, "browser_type": config.BROWSER_TYPE, "headless": config.HEADLESS, }, "totals": { "total_iterations": total_iterations, "successful": successful, "failed": failed, "success_rate": ( round(successful / total_iterations * 100, 2) if total_iterations > 0 else 0 ), }, "timing_ms": {}, "per_user": all_results, } if durations: summary["timing_ms"] = { "min": round(min(durations), 2), "max": round(max(durations), 2), "mean": round(statistics.mean(durations), 2), "median": round(statistics.median(durations), 2), "stdev": ( round(statistics.stdev(durations), 2) if len(durations) > 1 else 0 ), "p90": round( sorted(durations)[int(len(durations) * 0.9) - 1], 2 ) if len(durations) >= 2 else round(durations[0], 2), "p95": round( sorted(durations)[int(len(durations) * 0.95) - 1], 2 ) if len(durations) >= 2 else round(durations[0], 2), } return summary # stdout printing def print_summary(summary): """pretty print the results to stdout.""" logger.info("") logger.info("=" * 60) logger.info(" LOAD TEST RESULTS") logger.info("=" * 60) logger.info(f" target: {summary['config']['base_url']}") logger.info(f" browser: {summary['config']['browser_type']}") logger.info(f" concurrent users: {summary['config']['concurrent_users']}") logger.info(f" iterations/user: {summary['config']['iterations_per_user']}") logger.info(f" think time: {summary['config']['think_time']}s") logger.info("-" * 60) logger.info(f" total iterations: {summary['totals']['total_iterations']}") logger.info(f" successful: {summary['totals']['successful']}") logger.info(f" failed: {summary['totals']['failed']}") logger.info(f" success rate: {summary['totals']['success_rate']}%") logger.info("-" * 60) if summary["timing_ms"]: t = summary["timing_ms"] logger.info(f" min: {t['min']}ms") logger.info(f" max: {t['max']}ms") logger.info(f" mean: {t['mean']}ms") logger.info(f" median: {t['median']}ms") logger.info(f" stdev: {t['stdev']}ms") logger.info(f" p90: {t['p90']}ms") logger.info(f" p95: {t['p95']}ms") logger.info("=" * 60) # log any errors total_errors = sum(len(u["errors"]) for u in summary["per_user"]) if total_errors > 0: logger.warning(f" total errors: {total_errors}") for user in summary["per_user"]: for err in user["errors"]: logger.warning( f" user {user['user_id']} / iter {err['iteration']}: " f"{err['error']}" ) logger.info("=" * 60) def main(): parser = argparse.ArgumentParser( description="pw-loadtest: playwright-based load testing framework", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=( "examples:\n" " python3 loadtest.py\n" " python3 loadtest.py --scenario scenarios/example.py\n" " python3 loadtest.py --url http://myapp:3000 --users 10 --iterations 5\n" ), ) parser.add_argument( "--scenario", default=None, help="path to the scenario .py file (overrides config.py)", ) parser.add_argument( "--url", default=None, help="override BASE_URL from config.py", ) parser.add_argument( "--users", type=int, default=None, help="override CONCURRENT_USERS from config.py", ) parser.add_argument( "--iterations", type=int, default=None, help="override ITERATIONS_PER_USER from config.py", ) parser.add_argument( "--headed", action="store_true", default=False, help="run browsers visually (overrides HEADLESS=True in config)", ) args = parser.parse_args() # apply CLI overrides if args.url: config.BASE_URL = args.url if args.users: config.CONCURRENT_USERS = args.users if args.iterations: config.ITERATIONS_PER_USER = args.iterations if args.headed: config.HEADLESS = False setup_logging() # determine which scenario(s) to run # priority: CLI arg > config.SCENARIOS > default scenario_paths = [] if args.scenario: scenario_paths = [args.scenario] elif config.SCENARIOS is not None: if isinstance(config.SCENARIOS, str): scenario_paths = [config.SCENARIOS] elif isinstance(config.SCENARIOS, list): scenario_paths = config.SCENARIOS else: logger.error( "config.SCENARIOS must be None, a string, or a list of strings" ) sys.exit(1) else: # default scenario_paths = ["scenarios/example.py"] logger.info("=" * 60) logger.info(" pw-loadtest starting up...") logger.info("=" * 60) logger.info(f" target URL: {config.BASE_URL}") logger.info(f" concurrent users: {config.CONCURRENT_USERS}") logger.info(f" iterations/user: {config.ITERATIONS_PER_USER}") logger.info(f" think time: {config.THINK_TIME}s") logger.info(f" browser: {config.BROWSER_TYPE}") logger.info(f" headless: {config.HEADLESS}") logger.info(f" log level: {config.LOG_LEVEL}") logger.info(f" scenario(s): {len(scenario_paths)}") logger.info("=" * 60) # run each scenario all_scenario_results = [] overall_start = time.perf_counter() for scenario_path in scenario_paths: logger.info("") logger.info("=" * 60) logger.info(f" running scenario: {scenario_path}") logger.info("=" * 60) # load the scenario scenario_class = load_scenario(scenario_path) # run virtual users concurrently scenario_results = [] logger.info( f"launching {config.CONCURRENT_USERS} virtual user(s) concurrently..." ) with ThreadPoolExecutor(max_workers=config.CONCURRENT_USERS) as executor: futures = { executor.submit( run_virtual_user, user_id, scenario_class, scenario_results ): user_id for user_id in range(1, config.CONCURRENT_USERS + 1) } for future in as_completed(futures): user_id = futures[future] try: future.result() except Exception as e: logger.error(f"user {user_id} thread failed: {e}") # store results for this scenario all_scenario_results.append({ "scenario_path": scenario_path, "scenario_name": scenario_class.name, "results": scenario_results, }) overall_end = time.perf_counter() overall_duration = round((overall_end - overall_start) * 1000, 2) logger.info("") logger.info(f"all scenarios finished. total wall time: {overall_duration}ms") # summarize and print results for each scenario logger.info("") for scenario_data in all_scenario_results: logger.info("=" * 60) logger.info(f" scenario: {scenario_data['scenario_name']}") logger.info(f" path: {scenario_data['scenario_path']}") logger.info("=" * 60) summary = summarize_results(scenario_data["results"]) print_summary(summary) # combine all results for final output all_results = [] for scenario_data in all_scenario_results: all_results.extend(scenario_data["results"]) final_summary = summarize_results(all_results) final_summary["wall_time_ms"] = overall_duration final_summary["scenarios"] = [ { "path": s["scenario_path"], "name": s["scenario_name"], } for s in all_scenario_results ] # write results to file results_path = config.RESULTS_FILE results_dir = os.path.dirname(results_path) if results_dir: os.makedirs(results_dir, exist_ok=True) with open(results_path, "w") as f: json.dump(final_summary, f, indent=2, default=str) logger.info("") logger.info(f"detailed results written to: {results_path}") # exit with non-zero if any failures if final_summary["totals"]["failed"] > 0: logger.warning("some iterations failed — exiting with code 1") sys.exit(1) logger.info("all done. 🤙") if __name__ == "__main__": main()