import requests import threading import time from concurrent.futures import ThreadPoolExecutor from urllib.parse import urlparse import sys import signal # Biến cờ để kiểm soát việc dừng luồng stop_event = threading.Event() def make_request(url, request_count, thread_id): """Hàm gửi một request đến URL và in kết quả""" if stop_event.is_set(): return None, None try: start_time = time.time() response = requests.get(url, timeout=10) elapsed_time = time.time() - start_time status = response.status_code print(f"Thread {thread_id} - Request {request_count}: Status {status}, Time {elapsed_time:.3f}s") return status, elapsed_time except requests.RequestException as e: print(f"Thread {thread_id} - Request {request_count}: Error - {str(e)}") return None, None def stress_test(url, num_requests, num_threads): """Hàm chính thực hiện stress test""" print(f"\nStarting stress test on {url}") print(f"Total requests: {num_requests}, Threads: {num_threads}\n") # Tính số request mỗi luồng requests_per_thread = num_requests // num_threads extra_requests = num_requests % num_threads start_time = time.time() success_count = 0 total_time = 0 request_count = 0 def worker(thread_id, req_count): """Hàm worker cho mỗi luồng""" nonlocal success_count, total_time, request_count for i in range(1, req_count + 1): if stop_event.is_set(): break status, elapsed = make_request(url, request_count + i, thread_id) if status == 200: success_count += 1 total_time += elapsed request_count += 1 # Tạo ThreadPoolExecutor để quản lý luồng with ThreadPoolExecutor(max_workers=num_threads) as executor: futures = [] current_request_count = 0 for i in range(num_threads): reqs = requests_per_thread + (1 if i < extra_requests else 0) if reqs > 0: futures.append(executor.submit(worker, i + 1, reqs)) current_request_count += reqs # Chờ tất cả các luồng hoàn thành hoặc bị dừng try: for future in futures: future.result() except KeyboardInterrupt: print("\nReceived Ctrl+C, stopping all threads...") stop_event.set() # Đặt cờ để dừng tất cả luồng executor._threads.clear() # Xóa các luồng đang chạy threading.enumerate()[-1].join() # Đợi luồng chính kết thúc total_duration = time.time() - start_time avg_time = total_time / success_count if success_count > 0 else 0 # In báo cáo kết quả print("\n=== Stress Test Report ===") print(f"Total Requests Sent: {request_count}") print(f"Successful Requests: {success_count}") print(f"Failed Requests: {request_count - success_count}") print(f"Total Duration: {total_duration:.2f}s") print(f"Average Response Time (successful requests): {avg_time:.3f}s") print(f"Requests per Second: {request_count / total_duration:.2f}" if total_duration > 0 else 0) def validate_url(url): """Kiểm tra URL hợp lệ""" if not url.startswith(('http://', 'https://')): url = 'http://' + url try: result = urlparse(url) return url if result.scheme and result.netloc else None except: return None def signal_handler(sig, frame): """Xử lý tín hiệu Ctrl+C""" print("\nReceived Ctrl+C, initiating graceful shutdown...") stop_event.set() sys.exit(1) def main(): """Hàm chính để chạy chương trình""" # Đăng ký xử lý tín hiệu Ctrl+C signal.signal(signal.SIGINT, signal_handler) print("=== Stress Test Tool ===") # Nhập và kiểm tra URL url = input("Enter target URL (e.g., http://example.com): ").strip() url = validate_url(url) while not url: print("Invalid URL. Please enter a valid URL.") url = input("Enter target URL (e.g., http://example.com): ").strip() url = validate_url(url) # Nhập số lượng request while True: try: num_requests = int(input("Enter number of requests: ")) if num_requests <= 0: print("Number of requests must be greater than 0.") continue break except ValueError: print("Please enter a valid number.") # Nhập số luồng while True: try: num_threads = int(input("Enter number of threads: ")) if num_threads <= 0: print("Number of threads must be greater than 0.") continue if num_threads > num_requests: print("Number of threads cannot exceed number of requests.") continue break except ValueError: print("Please enter a valid number.") # Chạy stress test try: stress_test(url, num_requests, num_threads) except KeyboardInterrupt: print("\nStress test interrupted by user.") stop_event.set() sys.exit(1) if __name__ == "__main__": main()