diff options
| author | kj_sh604 | 2026-02-11 11:57:38 -0500 |
|---|---|---|
| committer | kj_sh604 | 2026-02-11 11:57:38 -0500 |
| commit | d060ff5ca58e87e6dd1c6d93b97adbfd45f57f9b (patch) | |
| tree | aded1017253e8aa5b5c2996caf30630f8ed89c1f /src/loadtest.py | |
| parent | aa845e20842ecf93f7e69b03097dbc6508a70fc8 (diff) | |
Diffstat (limited to 'src/loadtest.py')
| -rw-r--r-- | src/loadtest.py | 504 |
1 files changed, 504 insertions, 0 deletions
diff --git a/src/loadtest.py b/src/loadtest.py new file mode 100644 index 0000000..73807b5 --- /dev/null +++ b/src/loadtest.py @@ -0,0 +1,504 @@ +#!/usr/bin/env python3 + +# this is the entry point. run it with: +# python3 loadtest.py +# +# or with a custom scenario: +# python3 loadtest.py --scenario scenarios/example.py +# +# all configuration lives in config.py — go change stuff +# there before running this. + +import argparse +import importlib.util +import inspect +import json +import logging +import os +import statistics +import sys +import time +import traceback +from concurrent.futures import ThreadPoolExecutor, as_completed +from datetime import datetime, timezone + +from playwright.sync_api import sync_playwright + +import config +from scenarios.base import BaseScenario + +# logging setup +def setup_logging(): + """configure logging based on config.py settings.""" + log_level = getattr(logging, config.LOG_LEVEL.upper(), logging.DEBUG) + + fmt = ( + "%(asctime)s | %(levelname)-8s | %(name)-30s | %(message)s" + ) + datefmt = "%Y-%m-%d %H:%M:%S" + + handlers = [logging.StreamHandler(sys.stdout)] + + if config.LOG_TO_FILE: + log_dir = os.path.dirname(config.LOG_FILE) + if log_dir: + os.makedirs(log_dir, exist_ok=True) + file_handler = logging.FileHandler(config.LOG_FILE, mode="w") + handlers.append(file_handler) + + logging.basicConfig( + level=log_level, + format=fmt, + datefmt=datefmt, + handlers=handlers, + ) + + # quiet down some noisy loggers + logging.getLogger("urllib3").setLevel(logging.WARNING) + logging.getLogger("asyncio").setLevel(logging.WARNING) + + +logger = logging.getLogger("loadtest") + + +# scenario loader +def load_scenario(scenario_path): + """ + dynamically load a scenario from a .py file path. + finds the first class that subclasses BaseScenario. + """ + logger.info(f"loading scenario from: {scenario_path}") + + if not os.path.isfile(scenario_path): + logger.error(f"scenario file not found: {scenario_path}") + sys.exit(1) + + spec = importlib.util.spec_from_file_location("scenario_module", scenario_path) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + + # find the first BaseScenario subclass in the module + scenario_class = None + for _name, obj in inspect.getmembers(module, inspect.isclass): + if issubclass(obj, BaseScenario) and obj is not BaseScenario: + scenario_class = obj + break + + if scenario_class is None: + logger.error( + f"no BaseScenario subclass found in {scenario_path}. " + "make sure your scenario class inherits from scenarios.base.BaseScenario." + ) + sys.exit(1) + + logger.info(f"loaded scenario: {scenario_class.name} ({scenario_class.__name__})") + return scenario_class + + +# single virtual user runner +def run_virtual_user(user_id, scenario_class, results_list): + """ + runs a single virtual user through the scenario. + each virtual user gets its own browser context. + """ + user_logger = logging.getLogger(f"loadtest.user-{user_id:03d}") + user_logger.info("virtual user starting up...") + + user_results = { + "user_id": user_id, + "iterations": [], + "errors": [], + } + + try: + with sync_playwright() as p: + # launch browser + browser_launcher = getattr(p, config.BROWSER_TYPE) + browser = browser_launcher.launch(headless=config.HEADLESS) + + user_logger.info( + f"browser launched: {config.BROWSER_TYPE} " + f"(headless={config.HEADLESS})" + ) + + context = browser.new_context( + base_url=config.BASE_URL, + viewport={ + "width": config.VIEWPORT_WIDTH, + "height": config.VIEWPORT_HEIGHT, + }, + ) + context.set_default_navigation_timeout(config.NAVIGATION_TIMEOUT) + context.set_default_timeout(config.ACTION_TIMEOUT) + + page = context.new_page() + + # instantiate the scenario + scenario = scenario_class() + + # wire up request/response hooks + page.on("response", scenario.on_response) + page.on("request", scenario.on_request) + + user_logger.info( + f"starting {config.ITERATIONS_PER_USER} iteration(s)..." + ) + + for iteration in range(1, config.ITERATIONS_PER_USER + 1): + iter_logger = logging.getLogger( + f"loadtest.user-{user_id:03d}.iter-{iteration}" + ) + iter_logger.info(f"--- iteration {iteration} start ---") + + iter_result = { + "iteration": iteration, + "start_time": None, + "end_time": None, + "duration_ms": None, + "success": False, + "error": None, + } + + start = time.perf_counter() + iter_result["start_time"] = datetime.now(timezone.utc).isoformat() + + try: + scenario.setup(page) + scenario.run(page) + scenario.teardown(page) + iter_result["success"] = True + iter_logger.info("iteration completed successfully.") + except Exception as e: + iter_result["error"] = str(e) + user_results["errors"].append( + { + "iteration": iteration, + "error": str(e), + "traceback": traceback.format_exc(), + } + ) + iter_logger.error(f"iteration failed: {e}") + iter_logger.debug(traceback.format_exc()) + finally: + end = time.perf_counter() + duration_ms = round((end - start) * 1000, 2) + iter_result["end_time"] = datetime.now(timezone.utc).isoformat() + iter_result["duration_ms"] = duration_ms + iter_logger.info(f"duration: {duration_ms}ms") + user_results["iterations"].append(iter_result) + + # think time between iterations + if iteration < config.ITERATIONS_PER_USER and config.THINK_TIME > 0: + iter_logger.debug( + f"think time: sleeping {config.THINK_TIME}s..." + ) + time.sleep(config.THINK_TIME) + + # cleanup + page.close() + context.close() + browser.close() + user_logger.info("virtual user finished, browser closed.") + + except Exception as e: + user_logger.error(f"virtual user crashed: {e}") + user_logger.debug(traceback.format_exc()) + user_results["errors"].append( + { + "iteration": 0, + "error": str(e), + "traceback": traceback.format_exc(), + } + ) + + results_list.append(user_results) + return user_results + + +# results summary generaton +def summarize_results(all_results): + """crunch the numbers and print a summary.""" + total_iterations = 0 + successful = 0 + failed = 0 + durations = [] + + for user in all_results: + for iteration in user["iterations"]: + total_iterations += 1 + if iteration["success"]: + successful += 1 + durations.append(iteration["duration_ms"]) + else: + failed += 1 + + summary = { + "timestamp": datetime.now(timezone.utc).isoformat(), + "config": { + "base_url": config.BASE_URL, + "concurrent_users": config.CONCURRENT_USERS, + "iterations_per_user": config.ITERATIONS_PER_USER, + "think_time": config.THINK_TIME, + "browser_type": config.BROWSER_TYPE, + "headless": config.HEADLESS, + }, + "totals": { + "total_iterations": total_iterations, + "successful": successful, + "failed": failed, + "success_rate": ( + round(successful / total_iterations * 100, 2) + if total_iterations > 0 + else 0 + ), + }, + "timing_ms": {}, + "per_user": all_results, + } + + if durations: + summary["timing_ms"] = { + "min": round(min(durations), 2), + "max": round(max(durations), 2), + "mean": round(statistics.mean(durations), 2), + "median": round(statistics.median(durations), 2), + "stdev": ( + round(statistics.stdev(durations), 2) + if len(durations) > 1 + else 0 + ), + "p90": round( + sorted(durations)[int(len(durations) * 0.9) - 1], 2 + ) if len(durations) >= 2 else round(durations[0], 2), + "p95": round( + sorted(durations)[int(len(durations) * 0.95) - 1], 2 + ) if len(durations) >= 2 else round(durations[0], 2), + } + + return summary + + +# stdout printing +def print_summary(summary): + """pretty print the results to stdout.""" + logger.info("") + logger.info("=" * 60) + logger.info(" LOAD TEST RESULTS") + logger.info("=" * 60) + logger.info(f" target: {summary['config']['base_url']}") + logger.info(f" browser: {summary['config']['browser_type']}") + logger.info(f" concurrent users: {summary['config']['concurrent_users']}") + logger.info(f" iterations/user: {summary['config']['iterations_per_user']}") + logger.info(f" think time: {summary['config']['think_time']}s") + logger.info("-" * 60) + logger.info(f" total iterations: {summary['totals']['total_iterations']}") + logger.info(f" successful: {summary['totals']['successful']}") + logger.info(f" failed: {summary['totals']['failed']}") + logger.info(f" success rate: {summary['totals']['success_rate']}%") + logger.info("-" * 60) + + if summary["timing_ms"]: + t = summary["timing_ms"] + logger.info(f" min: {t['min']}ms") + logger.info(f" max: {t['max']}ms") + logger.info(f" mean: {t['mean']}ms") + logger.info(f" median: {t['median']}ms") + logger.info(f" stdev: {t['stdev']}ms") + logger.info(f" p90: {t['p90']}ms") + logger.info(f" p95: {t['p95']}ms") + + logger.info("=" * 60) + + # log any errors + total_errors = sum(len(u["errors"]) for u in summary["per_user"]) + if total_errors > 0: + logger.warning(f" total errors: {total_errors}") + for user in summary["per_user"]: + for err in user["errors"]: + logger.warning( + f" user {user['user_id']} / iter {err['iteration']}: " + f"{err['error']}" + ) + logger.info("=" * 60) + + + +def main(): + parser = argparse.ArgumentParser( + description="pw-loadtest: playwright-based load testing framework", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=( + "examples:\n" + " python3 loadtest.py\n" + " python3 loadtest.py --scenario scenarios/example.py\n" + " python3 loadtest.py --url http://myapp:3000 --users 10 --iterations 5\n" + ), + ) + parser.add_argument( + "--scenario", + default=None, + help="path to the scenario .py file (overrides config.py)", + ) + parser.add_argument( + "--url", + default=None, + help="override BASE_URL from config.py", + ) + parser.add_argument( + "--users", + type=int, + default=None, + help="override CONCURRENT_USERS from config.py", + ) + parser.add_argument( + "--iterations", + type=int, + default=None, + help="override ITERATIONS_PER_USER from config.py", + ) + parser.add_argument( + "--headed", + action="store_true", + default=False, + help="run browsers visually (overrides HEADLESS=True in config)", + ) + + args = parser.parse_args() + + # apply CLI overrides + if args.url: + config.BASE_URL = args.url + if args.users: + config.CONCURRENT_USERS = args.users + if args.iterations: + config.ITERATIONS_PER_USER = args.iterations + if args.headed: + config.HEADLESS = False + + setup_logging() + + # determine which scenario(s) to run + # priority: CLI arg > config.SCENARIOS > default + scenario_paths = [] + + if args.scenario: + scenario_paths = [args.scenario] + elif config.SCENARIOS is not None: + if isinstance(config.SCENARIOS, str): + scenario_paths = [config.SCENARIOS] + elif isinstance(config.SCENARIOS, list): + scenario_paths = config.SCENARIOS + else: + logger.error( + "config.SCENARIOS must be None, a string, or a list of strings" + ) + sys.exit(1) + else: + # default + scenario_paths = ["scenarios/example.py"] + + logger.info("=" * 60) + logger.info(" pw-loadtest starting up...") + logger.info("=" * 60) + logger.info(f" target URL: {config.BASE_URL}") + logger.info(f" concurrent users: {config.CONCURRENT_USERS}") + logger.info(f" iterations/user: {config.ITERATIONS_PER_USER}") + logger.info(f" think time: {config.THINK_TIME}s") + logger.info(f" browser: {config.BROWSER_TYPE}") + logger.info(f" headless: {config.HEADLESS}") + logger.info(f" log level: {config.LOG_LEVEL}") + logger.info(f" scenario(s): {len(scenario_paths)}") + logger.info("=" * 60) + + # run each scenario + all_scenario_results = [] + overall_start = time.perf_counter() + + for scenario_path in scenario_paths: + logger.info("") + logger.info("=" * 60) + logger.info(f" running scenario: {scenario_path}") + logger.info("=" * 60) + + # load the scenario + scenario_class = load_scenario(scenario_path) + + # run virtual users concurrently + scenario_results = [] + + logger.info( + f"launching {config.CONCURRENT_USERS} virtual user(s) concurrently..." + ) + + with ThreadPoolExecutor(max_workers=config.CONCURRENT_USERS) as executor: + futures = { + executor.submit( + run_virtual_user, user_id, scenario_class, scenario_results + ): user_id + for user_id in range(1, config.CONCURRENT_USERS + 1) + } + + for future in as_completed(futures): + user_id = futures[future] + try: + future.result() + except Exception as e: + logger.error(f"user {user_id} thread failed: {e}") + + # store results for this scenario + all_scenario_results.append({ + "scenario_path": scenario_path, + "scenario_name": scenario_class.name, + "results": scenario_results, + }) + + overall_end = time.perf_counter() + overall_duration = round((overall_end - overall_start) * 1000, 2) + logger.info("") + logger.info(f"all scenarios finished. total wall time: {overall_duration}ms") + + # summarize and print results for each scenario + logger.info("") + for scenario_data in all_scenario_results: + logger.info("=" * 60) + logger.info(f" scenario: {scenario_data['scenario_name']}") + logger.info(f" path: {scenario_data['scenario_path']}") + logger.info("=" * 60) + summary = summarize_results(scenario_data["results"]) + print_summary(summary) + + # combine all results for final output + all_results = [] + for scenario_data in all_scenario_results: + all_results.extend(scenario_data["results"]) + + final_summary = summarize_results(all_results) + final_summary["wall_time_ms"] = overall_duration + final_summary["scenarios"] = [ + { + "path": s["scenario_path"], + "name": s["scenario_name"], + } + for s in all_scenario_results + ] + + # write results to file + results_path = config.RESULTS_FILE + results_dir = os.path.dirname(results_path) + if results_dir: + os.makedirs(results_dir, exist_ok=True) + with open(results_path, "w") as f: + json.dump(final_summary, f, indent=2, default=str) + logger.info("") + logger.info(f"detailed results written to: {results_path}") + + # exit with non-zero if any failures + if final_summary["totals"]["failed"] > 0: + logger.warning("some iterations failed — exiting with code 1") + sys.exit(1) + + logger.info("all done. 🤙") + + +if __name__ == "__main__": + main()
\ No newline at end of file |
