150 lines
5.4 KiB
Python
150 lines
5.4 KiB
Python
import requests
|
|
import threading
|
|
import time
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from urllib.parse import urlparse
|
|
import sys
|
|
import signal
|
|
|
|
# Biến cờ để kiểm soát việc dừng luồng
|
|
stop_event = threading.Event()
|
|
|
|
def make_request(url, request_count, thread_id):
|
|
"""Hàm gửi một request đến URL và in kết quả"""
|
|
if stop_event.is_set():
|
|
return None, None
|
|
try:
|
|
start_time = time.time()
|
|
response = requests.get(url, timeout=10)
|
|
elapsed_time = time.time() - start_time
|
|
status = response.status_code
|
|
print(f"Thread {thread_id} - Request {request_count}: Status {status}, Time {elapsed_time:.3f}s")
|
|
return status, elapsed_time
|
|
except requests.RequestException as e:
|
|
print(f"Thread {thread_id} - Request {request_count}: Error - {str(e)}")
|
|
return None, None
|
|
|
|
def stress_test(url, num_requests, num_threads):
|
|
"""Hàm chính thực hiện stress test"""
|
|
print(f"\nStarting stress test on {url}")
|
|
print(f"Total requests: {num_requests}, Threads: {num_threads}\n")
|
|
|
|
# Tính số request mỗi luồng
|
|
requests_per_thread = num_requests // num_threads
|
|
extra_requests = num_requests % num_threads
|
|
|
|
start_time = time.time()
|
|
success_count = 0
|
|
total_time = 0
|
|
request_count = 0
|
|
|
|
def worker(thread_id, req_count):
|
|
"""Hàm worker cho mỗi luồng"""
|
|
nonlocal success_count, total_time, request_count
|
|
for i in range(1, req_count + 1):
|
|
if stop_event.is_set():
|
|
break
|
|
status, elapsed = make_request(url, request_count + i, thread_id)
|
|
if status == 200:
|
|
success_count += 1
|
|
total_time += elapsed
|
|
request_count += 1
|
|
|
|
# Tạo ThreadPoolExecutor để quản lý luồng
|
|
with ThreadPoolExecutor(max_workers=num_threads) as executor:
|
|
futures = []
|
|
current_request_count = 0
|
|
for i in range(num_threads):
|
|
reqs = requests_per_thread + (1 if i < extra_requests else 0)
|
|
if reqs > 0:
|
|
futures.append(executor.submit(worker, i + 1, reqs))
|
|
current_request_count += reqs
|
|
|
|
# Chờ tất cả các luồng hoàn thành hoặc bị dừng
|
|
try:
|
|
for future in futures:
|
|
future.result()
|
|
except KeyboardInterrupt:
|
|
print("\nReceived Ctrl+C, stopping all threads...")
|
|
stop_event.set() # Đặt cờ để dừng tất cả luồng
|
|
executor._threads.clear() # Xóa các luồng đang chạy
|
|
threading.enumerate()[-1].join() # Đợi luồng chính kết thúc
|
|
|
|
total_duration = time.time() - start_time
|
|
avg_time = total_time / success_count if success_count > 0 else 0
|
|
|
|
# In báo cáo kết quả
|
|
print("\n=== Stress Test Report ===")
|
|
print(f"Total Requests Sent: {request_count}")
|
|
print(f"Successful Requests: {success_count}")
|
|
print(f"Failed Requests: {request_count - success_count}")
|
|
print(f"Total Duration: {total_duration:.2f}s")
|
|
print(f"Average Response Time (successful requests): {avg_time:.3f}s")
|
|
print(f"Requests per Second: {request_count / total_duration:.2f}" if total_duration > 0 else 0)
|
|
|
|
def validate_url(url):
|
|
"""Kiểm tra URL hợp lệ"""
|
|
if not url.startswith(('http://', 'https://')):
|
|
url = 'http://' + url
|
|
try:
|
|
result = urlparse(url)
|
|
return url if result.scheme and result.netloc else None
|
|
except:
|
|
return None
|
|
|
|
def signal_handler(sig, frame):
|
|
"""Xử lý tín hiệu Ctrl+C"""
|
|
print("\nReceived Ctrl+C, initiating graceful shutdown...")
|
|
stop_event.set()
|
|
sys.exit(1)
|
|
|
|
def main():
|
|
"""Hàm chính để chạy chương trình"""
|
|
# Đăng ký xử lý tín hiệu Ctrl+C
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
|
|
print("=== Stress Test Tool ===")
|
|
|
|
# Nhập và kiểm tra URL
|
|
url = input("Enter target URL (e.g., http://example.com): ").strip()
|
|
url = validate_url(url)
|
|
while not url:
|
|
print("Invalid URL. Please enter a valid URL.")
|
|
url = input("Enter target URL (e.g., http://example.com): ").strip()
|
|
url = validate_url(url)
|
|
|
|
# Nhập số lượng request
|
|
while True:
|
|
try:
|
|
num_requests = int(input("Enter number of requests: "))
|
|
if num_requests <= 0:
|
|
print("Number of requests must be greater than 0.")
|
|
continue
|
|
break
|
|
except ValueError:
|
|
print("Please enter a valid number.")
|
|
|
|
# Nhập số luồng
|
|
while True:
|
|
try:
|
|
num_threads = int(input("Enter number of threads: "))
|
|
if num_threads <= 0:
|
|
print("Number of threads must be greater than 0.")
|
|
continue
|
|
if num_threads > num_requests:
|
|
print("Number of threads cannot exceed number of requests.")
|
|
continue
|
|
break
|
|
except ValueError:
|
|
print("Please enter a valid number.")
|
|
|
|
# Chạy stress test
|
|
try:
|
|
stress_test(url, num_requests, num_threads)
|
|
except KeyboardInterrupt:
|
|
print("\nStress test interrupted by user.")
|
|
stop_event.set()
|
|
sys.exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
main() |