pip3 install -r .\\requirements.txt
python3 .\\ecs_autoscaler.py --cluster apdev-cluster --region ap-northeast-2
python3 .\\rps_monitor.py
python3 .\\ecs_ngrep_gui.py
python3 .\\waf_gui.py
import boto3
import time
from datetime import datetime, timedelta, UTC
from rich.console import Console
from rich.table import Table
from rich.live import Live
from rich.panel import Panel
from rich.layout import Layout
import threading
import sys
import platform
# Windows-compatible keyboard input
if platform.system() == 'Windows':
import msvcrt
else:
import select
import tty
import termios
class ECSAutoscaler:
def __init__(self, cluster_name, region='ap-northeast-2', log_group_prefix=None, min_tasks=1):
self.ecs = boto3.client('ecs', region_name=region)
self.cloudwatch = boto3.client('cloudwatch', region_name=region)
self.elbv2 = boto3.client('elbv2', region_name=region)
self.logs = boto3.client('logs', region_name=region)
self.autoscaling = boto3.client('autoscaling', region_name=region)
self.ec2 = boto3.client('ec2', region_name=region)
self.cluster_name = cluster_name
self.log_group_prefix = log_group_prefix or f"/aws/ecs/{cluster_name}"
self.min_tasks = min_tasks
self.console = Console()
self.service_data = {}
self.node_data = {}
self.last_update = ""
self.scaling_history = {}
self.last_scale_out = {} # Track last scale-out time per service
self.last_scale_in = {} # Track last scale-in time per service
self.target_group_cache = {} # Cache target group ARNs
self.scale_in_counter = {} # Track consecutive low CPU readings
self.cpu_history = {} # Track CPU readings for 2-cycle monitoring
self.last_desired_count = {} # Track last known desired count
self.manual_modification_time = {} # Track manual modification timestamps
self.cpu_metric_mode = 'max' # 'max', 'avg', or 'off' - default to max
self.keyboard_thread = None
def get_services(self):
response = self.ecs.list_services(cluster=self.cluster_name)
return [arn.split('/')[-1] for arn in response['serviceArns']]
def start_keyboard_listener(self):
"""Start keyboard listener in a separate thread"""
def listen_for_keys():
if platform.system() == 'Windows':
# Windows version using msvcrt
while True:
if msvcrt.kbhit():
key = msvcrt.getch().decode('utf-8').lower()
if key == 'm':
self.toggle_cpu_metric()
elif key == 'q':
break
time.sleep(0.1)
else:
# Unix/Linux version
old_settings = termios.tcgetattr(sys.stdin)
try:
tty.setraw(sys.stdin.fileno())
while True:
if select.select([sys.stdin], [], [], 0.1)[0]:
key = sys.stdin.read(1)
if key.lower() == 'm':
self.toggle_cpu_metric()
elif key.lower() == 'q':
break
finally:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
self.keyboard_thread = threading.Thread(target=listen_for_keys, daemon=True)
self.keyboard_thread.start()
def toggle_cpu_metric(self):
"""Toggle between max, avg, and off CPU metrics"""
if self.cpu_metric_mode == 'max':
self.cpu_metric_mode = 'avg'
elif self.cpu_metric_mode == 'avg':
self.cpu_metric_mode = 'off'
else: # 'off'
self.cpu_metric_mode = 'max'
def get_cpu_value(self, service_name):
"""Get CPU value based on current mode (max or avg)"""
if service_name in self.service_data:
if self.cpu_metric_mode == 'max':
return self.service_data[service_name]['cpu_max']
else:
return self.service_data[service_name]['cpu_avg']
return 0
def get_cpu_data(self, service_name):
end_time = datetime.now(UTC)
start_time = end_time - timedelta(minutes=5) # Reduced to 5 minutes for more recent data
response = self.cloudwatch.get_metric_statistics(
Namespace='AWS/ECS',
MetricName='CPUUtilization',
Dimensions=[
{'Name': 'ServiceName', 'Value': service_name},
{'Name': 'ClusterName', 'Value': self.cluster_name}
],
StartTime=start_time,
EndTime=end_time,
Period=60, # 1 minute intervals for more granular data
Statistics=['Maximum', 'Average']
)
history = []
current_max, current_avg = 0, 0
if response['Datapoints']:
sorted_data = sorted(response['Datapoints'], key=lambda x: x['Timestamp'])
# 히스토리 데이터 생성
for dp in sorted_data:
history.append({
'time': dp['Timestamp'].strftime("%H:%M"),
'max': dp['Maximum'],
'avg': dp['Average']
})
# 최신 값들 (테이블용)
recent_data = sorted_data[-2:] if len(sorted_data) >= 2 else sorted_data
if recent_data:
current_max = max(dp['Maximum'] for dp in recent_data)
current_avg = sum(dp['Average'] for dp in recent_data) / len(recent_data)
# 캐시된 값 사용 (데이터가 없을 때)
if not response['Datapoints'] and service_name in self.service_data:
current_max = self.service_data[service_name]['cpu_max']
current_avg = self.service_data[service_name]['cpu_avg']
return current_max, current_avg, history[-10:]
def get_service_details(self, service_name):
response = self.ecs.describe_services(
cluster=self.cluster_name,
services=[service_name]
)
service = response['services'][0]
return {
'desired': service['desiredCount'],
'running': service['runningCount'],
'pending': service['pendingCount'],
'status': service['status'],
'task_definition': service['taskDefinition'].split('/')[-1]
}
def get_memory_utilization(self, service_name):
end_time = datetime.now(UTC)
start_time = end_time - timedelta(minutes=1)
response = self.cloudwatch.get_metric_statistics(
Namespace='AWS/ECS',
MetricName='MemoryUtilization',
Dimensions=[
{'Name': 'ServiceName', 'Value': service_name},
{'Name': 'ClusterName', 'Value': self.cluster_name}
],
StartTime=start_time,
EndTime=end_time,
Period=60,
Statistics=['Maximum']
)
if response['Datapoints']:
return max(dp['Maximum'] for dp in response['Datapoints'])
if service_name in self.service_data:
return self.service_data[service_name].get('memory', 0)
return 0
def get_scaling_progress(self, service_name, desired, running, pending):
if service_name not in self.scaling_history:
return "Stable", "green"
history = self.scaling_history[service_name]
target = history['target']
if desired == running and pending == 0:
if desired == target:
return "Complete", "green"
else:
return "Stable", "green"
elif running < desired:
progress = int((running / desired) * 100) if desired > 0 else 0
if pending > 0:
return f"Scaling {progress}%", "yellow"
else:
return f"Starting {progress}%", "bright_yellow"
elif running > desired:
return "Draining", "red"
else:
return "In Progress", "yellow"
def detect_manual_modification(self, service_name, current_desired):
"""Detect if desired count was manually modified"""
if service_name not in self.last_desired_count:
self.last_desired_count[service_name] = current_desired
return False
last_known = self.last_desired_count[service_name]
# Check if desired count changed without our scaling action
if current_desired != last_known:
# Check if this change was from our recent scaling action
if service_name in self.scaling_history:
our_target = self.scaling_history[service_name]['target']
our_time = self.scaling_history[service_name]['timestamp']
# If current desired matches our target and it's recent (within 10 seconds), it's our change
if current_desired == our_target and (datetime.now() - our_time).total_seconds() < 10:
self.last_desired_count[service_name] = current_desired
return False
# This is a manual modification
self.manual_modification_time[service_name] = datetime.now()
self.last_desired_count[service_name] = current_desired
return True
return False
def is_in_manual_cooldown(self, service_name):
"""Check if service is in manual modification cooldown (60 seconds)"""
if service_name not in self.manual_modification_time:
return False
cooldown_seconds = 60
time_since_manual = (datetime.now() - self.manual_modification_time[service_name]).total_seconds()
return time_since_manual < cooldown_seconds
if service_name not in self.last_scale_out:
return True
last_scale_time = self.last_scale_out[service_name]
cooldown_seconds = 120 # 2 minutes cooldown
time_since_last_scale = (datetime.now() - last_scale_time).total_seconds()
return time_since_last_scale >= cooldown_seconds
def can_scale_out(self, service_name):
if service_name not in self.last_scale_out:
return True
last_scale_time = self.last_scale_out[service_name]
cooldown_seconds = 120 # 2 minutes cooldown
time_since_last_scale = (datetime.now() - last_scale_time).total_seconds()
return time_since_last_scale >= cooldown_seconds
def can_scale_in(self, service_name):
if service_name not in self.last_scale_in:
return True
last_scale_time = self.last_scale_in[service_name]
cooldown_seconds = 60
time_since_last_scale = (datetime.now() - last_scale_time).total_seconds()
return time_since_last_scale >= cooldown_seconds
def scale_service(self, service_name, desired_count, is_scale_out=False):
self.ecs.update_service(
cluster=self.cluster_name,
service=service_name,
desiredCount=desired_count
)
self.scaling_history[service_name] = {
'target': desired_count,
'timestamp': datetime.now()
}
def get_cpu_history(self, service_name):
# 이제 캐시된 데이터 사용
if service_name in self.service_data and 'cpu_history' in self.service_data[service_name]:
return self.service_data[service_name]['cpu_history']
return []
def get_running_ec2_instances(self):
"""Get running EC2 instances in the cluster's ASG"""
try:
# Get container instances from ECS cluster
response = self.ecs.list_container_instances(cluster=self.cluster_name)
if not response['containerInstanceArns']:
return {}
# Get detailed info about container instances
instances_response = self.ecs.describe_container_instances(
cluster=self.cluster_name,
containerInstances=response['containerInstanceArns']
)
# Get EC2 instance IDs
instance_ids = [inst['ec2InstanceId'] for inst in instances_response['containerInstances']]
if not instance_ids:
return {}
# Get EC2 instance details
ec2_response = self.ec2.describe_instances(InstanceIds=instance_ids)
running_instances = {}
for reservation in ec2_response['Reservations']:
for instance in reservation['Instances']:
if instance['State']['Name'] == 'running':
instance_id = instance['InstanceId']
# Find corresponding container instance
for cont_inst in instances_response['containerInstances']:
if cont_inst['ec2InstanceId'] == instance_id:
running_instances[instance_id] = {
'running_tasks': cont_inst['runningTasksCount'],
'pending_tasks': cont_inst['pendingTasksCount'],
'total_tasks': cont_inst['runningTasksCount'] + cont_inst['pendingTasksCount'],
'private_ip': instance.get('PrivateIpAddress', 'N/A'),
'instance_type': instance.get('InstanceType', 'N/A')
}
break
return running_instances
except Exception as e:
return {}
def create_ec2_task_display(self):
"""Create simple text display showing EC2 instance ID and task count"""
running_instances = self.get_running_ec2_instances()
if not running_instances:
return "No running EC2 instances found"
display_text = ""
for instance_id, data in running_instances.items():
total_tasks = data['running_tasks'] + data['pending_tasks']
display_text += f"{instance_id} {total_tasks}\\n"
return display_text.rstrip()
def get_asg_info(self):
try:
# ECS 클러스터의 컨테이너 인스턴스 조회
response = self.ecs.list_container_instances(cluster=self.cluster_name)
if not response['containerInstanceArns']:
return {}
# 컨테이너 인스턴스 상세 정보 조회
instances_response = self.ecs.describe_container_instances(
cluster=self.cluster_name,
containerInstances=response['containerInstanceArns']
)
# EC2 인스턴스 ID 수집
instance_ids = [inst['ec2InstanceId'] for inst in instances_response['containerInstances']]
if not instance_ids:
return {}
# EC2 인스턴스 정보 조회
ec2_response = self.ec2.describe_instances(InstanceIds=instance_ids)
asg_data = {}
for reservation in ec2_response['Reservations']:
for instance in reservation['Instances']:
# Auto Scaling Group 태그 찾기
asg_name = None
for tag in instance.get('Tags', []):
if tag['Key'] == 'aws:autoscaling:groupName':
asg_name = tag['Value']
break
if asg_name and asg_name not in asg_data:
# ASG 정보 조회
asg_response = self.autoscaling.describe_auto_scaling_groups(
AutoScalingGroupNames=[asg_name]
)
if asg_response['AutoScalingGroups']:
asg = asg_response['AutoScalingGroups'][0]
asg_data[asg_name] = {
'desired': asg['DesiredCapacity'],
'min': asg['MinSize'],
'max': asg['MaxSize'],
'instances': len([i for i in asg['Instances'] if i['LifecycleState'] == 'InService']),
'pending': len([i for i in asg['Instances'] if i['LifecycleState'] == 'Pending']),
'terminating': len([i for i in asg['Instances'] if i['LifecycleState'] == 'Terminating'])
}
return asg_data
except Exception:
return {}
def create_node_table(self):
table = Table(title=f"Auto Scaling Groups - Cluster: {self.cluster_name}")
table.add_column("ASG Name", style="cyan", width=30)
table.add_column("Capacity (D/Min/Max)", style="yellow", width=18)
table.add_column("Instances (R/P/T)", style="green", width=15)
table.add_column("Status", style="white", width=15)
for asg_name, data in self.node_data.items():
status = "Stable"
status_color = "green"
if data['pending'] > 0:
status = "Scaling Out"
status_color = "yellow"
elif data['terminating'] > 0:
status = "Scaling In"
status_color = "red"
elif data['instances'] != data['desired']:
status = "Adjusting"
status_color = "orange"
table.add_row(
asg_name,
f"{data['desired']}/{data['min']}/{data['max']}",
f"{data['instances']}/{data['pending']}/{data['terminating']}",
f"[{status_color}]{status}[/{status_color}]"
)
return table
def create_config_panel(self):
if self.cpu_metric_mode == 'max':
cpu_mode_display = f"[bold green]CPU MAX[/bold green]"
elif self.cpu_metric_mode == 'avg':
cpu_mode_display = f"[bold yellow]CPU AVG[/bold yellow]"
else: # 'off'
cpu_mode_display = f"[bold red]SCALING OFF[/bold red]"
config_text = f"""[bold cyan]Scaling Configuration:[/bold cyan]
• CPU Metric Mode: {cpu_mode_display} (Press 'M' to toggle)
• CPU Scale-out Threshold: 50% (normal services), 70% (stress services)
• CPU Scale-in Threshold: 25%
• CPU Emergency Scale-in: < 10% (scale to 1 task immediately)
• Scale-out Cooldown: 120 seconds
• Scale-in Cooldown: 60 seconds
• Manual Modification Cooldown: 60 seconds
[bold yellow]🚀 Scale-out 동작:[/bold yellow]
• Normal 서비스 (임계치 초과 정도별):
- 30% 이상 초과: 2개 필요
- 20% 이상 초과: 1개 필요
- 10% 이상 초과: 1개 필요
• Stress 서비스 (임계치 초과 정도별):
- 30% 이상 초과: 3개 필요
- 20% 이상 초과: 2개 필요
- 10% 이상 초과: 1개 필요
• 120초 cooldown 적용
• Pending task 수만큼 필요 task에서 차감
• EC2 자동 확장: 모든 인스턴스가 4개 task로 가득 차면 EC2 1대 추가
[bold red]📉 Scale-in 동작:[/bold red]
• CPU 30% 이하: 1개씩 task 감소
• CPU 10% 이하: 즉시 1개 task로 긴급 축소
• Scale-in 중일 때는 EC2 확장 안함
[bold green]💡 스마트 기능:[/bold green]
• Pending task 고려하여 중복 scaling 방지
• CPU 사용률 기반 최적 task 수 계산
• t3.medium 기준 인스턴스당 최대 4개 task
• 수동 수정 감지 시 60초 자동 scaling 일시정지
• Scaling Off 모드: 모든 데이터 수집하지만 자동 스케일링 비활성화
[bold magenta]⌨️ Controls:[/bold magenta]
• Press 'M' to toggle CPU metric (Max/Avg/Off)
• Press 'Q' to quit"""
return config_text
if not history:
return f"No {metric_name} data"
values = [h[metric_name.lower()] for h in history]
max_val = max(values) if values else 100
scale = max_val / 10 if max_val > 0 else 1
graph = f"{metric_name} (0-{max_val:.0f}%):\\n"
for i, val in enumerate(values):
bar_height = int(val / scale) if scale > 0 else 0
bar = "█" * bar_height + "░" * (10 - bar_height)
time_str = history[i]['time']
graph += f"{time_str} |{bar}| {val:4.1f}%\\n"
return graph
def create_display(self):
table = Table(title=f"ECS Autoscaler - Cluster: {self.cluster_name}")
table.add_column("Service", style="cyan", width=20)
table.add_column("Status", style="green", width=10)
table.add_column("Tasks (D/R/P)", style="yellow", width=12)
table.add_column("Progress", style="white", width=12)
table.add_column("CPU Max %", style="magenta", width=10)
table.add_column("CPU Avg %", style="blue", width=10)
table.add_column("Task P90 ms", style="red", width=10)
table.add_column("RPS", style="cyan", width=10)
table.add_column("Success %", style="green", width=10)
table.add_column("Memory %", style="blue", width=10)
table.add_column("Scaling Action", style="red", width=25)
for service, data in self.service_data.items():
threshold = self.get_threshold(service)
cpu_max_color = "red" if data['cpu_max'] > threshold else "green"
cpu_avg_color = "red" if data['cpu_avg'] < 40 else "green"
task_p90_color = "red" if data['task_p90'] > 300 else "green"
mem_color = "red" if data['memory'] > 80 else "green"
# Calculate success rate
success_rate = 0
if data['total_requests'] > 0:
success_rate = (data['success_requests'] / data['total_requests']) * 100
success_color = "red" if success_rate < 95 else "green"
progress_text, progress_color = self.get_scaling_progress(
service, data['desired'], data['running'], data['pending']
)
table.add_row(
service,
data['status'],
f"{data['desired']}/{data['running']}/{data['pending']}",
f"[{progress_color}]{progress_text}[/{progress_color}]",
f"[{cpu_max_color}]{data['cpu_max']:.1f}%[/{cpu_max_color}]",
f"[{cpu_avg_color}]{data['cpu_avg']:.1f}%[/{cpu_avg_color}]",
f"[{task_p90_color}]{data['task_p90']:.0f}[/{task_p90_color}]",
f"{data['total_requests']}", # Show RPS as simple number
f"[{success_color}]{success_rate:.1f}%[/{success_color}]",
f"[{mem_color}]{data['memory']:.1f}%[/{mem_color}]",
data['scaling_action']
)
# Create EC2 task distribution display
ec2_task_display = self.create_ec2_task_display()
layout = Layout()
layout.split_column(
Layout(Panel(table, title="ECS CPU Autoscaler", border_style="bright_blue"), name="main"),
Layout(Panel(self.create_node_table(), title="Cluster Nodes", border_style="bright_green"), name="nodes"),
Layout(Panel(ec2_task_display, title="EC2 Task Distribution", border_style="bright_cyan"), name="ec2_tasks"),
Layout(Panel(self.create_config_panel(), title="Configuration", border_style="bright_magenta"), name="config"),
Layout(Panel(f"Last Update: {self.last_update}", border_style="dim"), name="bottom")
)
layout["main"].size = 15
layout["nodes"].size = 10
layout["ec2_tasks"].size = 12
layout["config"].size = 8
layout["bottom"].size = 3
return layout
def get_ec2_task_distribution(self):
"""Get task distribution across EC2 instances"""
try:
# Get container instances
response = self.ecs.list_container_instances(cluster=self.cluster_name)
if not response['containerInstanceArns']:
return {}
# Get detailed info about container instances
instances_response = self.ecs.describe_container_instances(
cluster=self.cluster_name,
containerInstances=response['containerInstanceArns']
)
instance_tasks = {}
for instance in instances_response['containerInstances']:
instance_id = instance['ec2InstanceId']
running_tasks = instance['runningTasksCount']
pending_tasks = instance['pendingTasksCount']
instance_tasks[instance_id] = {
'running': running_tasks,
'pending': pending_tasks,
'total': running_tasks + pending_tasks
}
return instance_tasks
except Exception as e:
print(f"Error getting EC2 task distribution: {e}")
return {}
def check_asg_scale_needed(self):
"""Check if ASG needs to be scaled up"""
try:
# Get total pending tasks across all services
total_pending = 0
any_scale_in = False
for service_name in self.get_services():
service_details = self.get_service_details(service_name)
total_pending += service_details['pending']
# Check if any service is scaling in
if service_name in self.service_data:
cpu_value = self.get_cpu_value(service_name)
if cpu_value < 25: # Scale-in threshold
any_scale_in = True
# Don't scale ASG if any service is scaling in
if any_scale_in:
return False
# Check if we have pending tasks
if total_pending == 0:
return False
# Get current EC2 task distribution
instance_tasks = self.get_ec2_task_distribution()
if not instance_tasks:
return False
# Check if all instances are at capacity (4 tasks each for t3.medium)
max_tasks_per_instance = 4
all_at_capacity = True
for instance_id, tasks in instance_tasks.items():
if tasks['total'] < max_tasks_per_instance:
all_at_capacity = False
break
return all_at_capacity and total_pending > 0
except Exception as e:
print(f"Error checking ASG scale need: {e}")
return False
def scale_asg_up(self):
"""Scale up ASG by 1 instance"""
try:
# Find ASG with apdev-node- pattern
response = self.autoscaling.describe_auto_scaling_groups()
asg_name = None
for asg in response['AutoScalingGroups']:
if asg['AutoScalingGroupName'].startswith('apdev-node-'):
asg_name = asg['AutoScalingGroupName']
break
if not asg_name:
print("No ASG found with apdev-node- pattern")
return False
# Get current ASG details
asg_response = self.autoscaling.describe_auto_scaling_groups(
AutoScalingGroupNames=[asg_name]
)
asg = asg_response['AutoScalingGroups'][0]
current_desired = asg['DesiredCapacity']
max_size = asg['MaxSize']
if current_desired >= max_size:
print(f"ASG already at max capacity: {current_desired}/{max_size}")
return False
# Increase desired capacity by 1
new_desired = current_desired + 1
self.autoscaling.set_desired_capacity(
AutoScalingGroupName=asg_name,
DesiredCapacity=new_desired,
HonorCooldown=False
)
print(f"Scaled ASG {asg_name} from {current_desired} to {new_desired}")
return True
except Exception as e:
print(f"Error scaling ASG: {e}")
return False
def get_threshold(self, service_name):
return 70 if 'stress' in service_name else 50 # Scale-out threshold: 70% for stress services, 50% for others
return 75 if 'stress' in service_name else 50 # Scale-out threshold: 75% for stress services, 50% for others
def get_target_group_arn(self, service_name):
# Extract cluster prefix for target group naming
cluster_prefix = self.cluster_name.split('-cluster')[0] if '-cluster' in self.cluster_name else self.cluster_name
# Remove trailing numbers from service name
import re
service_base = re.sub(r'\\d+$', '', service_name)
target_group_name = f"{cluster_prefix}-{service_base}-tg"
# Check cache first
if target_group_name in self.target_group_cache:
return self.target_group_cache[target_group_name]
try:
# Get target group ARN dynamically
response = self.elbv2.describe_target_groups(Names=[target_group_name])
if response['TargetGroups']:
arn = response['TargetGroups'][0]['TargetGroupArn']
self.target_group_cache[target_group_name] = arn
return arn
except Exception:
pass
return None
def get_task_latency_p90(self, service_name):
end_time = datetime.now(UTC)
start_time = end_time - timedelta(minutes=5) # Increased for testing
log_group_name = f"{self.log_group_prefix}/{service_name}"
try:
response = self.logs.filter_log_events(
logGroupName=log_group_name,
startTime=int(start_time.timestamp() * 1000),
endTime=int(end_time.timestamp() * 1000),
filterPattern='latency',
limit=100
)
latencies = []
for event in response.get('events', []):
message = event['message']
import re
# ms, μs 단위를 포함한 패턴 매칭
match = re.search(r'(\\d+(?:\\.\\d+)?)\\s*(ms|μs|us)', message, re.IGNORECASE)
if match:
latency = float(match.group(1))
unit = match.group(2).lower()
# μs나 us를 ms로 변환
if unit in ['μs', 'us']:
latency = latency / 1000
latencies.append(latency)
if latencies:
latencies.sort()
p90_index = int(len(latencies) * 0.9)
return latencies[p90_index] if p90_index < len(latencies) else latencies[-1]
except Exception:
pass
return 0
def get_task_log_requests(self, service_name):
log_group_name = f"{self.log_group_prefix}/{service_name}"
# CloudWatch Logs 지연을 고려해서 30초 전부터 10초 전까지 조회
end_time = int(time.time() * 1000) - 10000 # 10초 전
start_time = end_time - 20000 # 20초 구간
try:
response = self.logs.filter_log_events(
logGroupName=log_group_name,
startTime=start_time,
endTime=end_time,
)
events = response.get("events", [])
total_requests = len(events)
# 20초 구간의 로그를 초당으로 환산
rps = total_requests / 20 if total_requests > 0 else 0
success_requests = 0
for event in events:
message = event['message'].lower()
if any(keyword in message for keyword in ['200', '201', '202', 'success', 'ok']):
success_requests += 1
success_rps = success_requests / 20 if success_requests > 0 else 0
failure_rps = rps - success_rps
return int(rps), int(success_rps), int(failure_rps)
except Exception:
return 0, 0, 0
def format_number(self, num):
if num >= 1_000_000_000:
return f"{num/1_000_000_000:.1f}b"
elif num >= 1_000_000:
return f"{num/1_000_000:.1f}m"
elif num >= 1_000:
return f"{num/1_000:.1f}k"
else:
return f"{int(num)}"
def run(self):
# Start keyboard listener
self.start_keyboard_listener()
with Live(self.create_display(), refresh_per_second=1) as live:
while True:
services = self.get_services()
self.node_data = self.get_asg_info()
self.last_update = datetime.now().strftime("%H:%M:%S")
for service_name in services:
cpu_max, cpu_avg, cpu_history = self.get_cpu_data(service_name)
memory_util = self.get_memory_utilization(service_name)
task_p90 = self.get_task_latency_p90(service_name)
total_requests, success_requests, failure_requests = self.get_task_log_requests(service_name)
service_details = self.get_service_details(service_name)
scaling_action = "Normal"
up_threshold = self.get_threshold(service_name)
down_threshold = 25 # Scale-in threshold changed to 25%
p90_threshold = 300 # 300ms (0.3s)
# Get current desired count from ECS
current_desired = service_details['desired']
# Check for manual modifications
manual_modified = self.detect_manual_modification(service_name, current_desired)
in_manual_cooldown = self.is_in_manual_cooldown(service_name)
# Skip scaling if in manual cooldown or scaling is off
if in_manual_cooldown:
remaining_time = 60 - (datetime.now() - self.manual_modification_time[service_name]).total_seconds()
scaling_action = f"Manual cooldown ({remaining_time:.0f}s remaining)"
elif manual_modified:
scaling_action = "Manual modification detected"
elif self.cpu_metric_mode == 'off':
scaling_action = "Scaling disabled"
else:
# Simple CPU-based scaling (only if not in manual cooldown and scaling is enabled)
cpu_value = self.get_cpu_value(service_name)
if cpu_value > up_threshold and self.can_scale_out(service_name):
# Get CPU value based on current mode
cpu_value = self.get_cpu_value(service_name)
# Determine scale amount based on CPU difference and service type
cpu_diff = cpu_value - up_threshold
if 'stress' in service_name:
# Stress service scaling: 10%->1, 20%->2, 30%->3
if cpu_diff >= 30:
needed_tasks = 3
elif cpu_diff >= 20:
needed_tasks = 2
elif cpu_diff >= 10:
needed_tasks = 1
else:
needed_tasks = 1
else:
# Normal service scaling: 10%->1, 20%->1, 30%->2
if cpu_diff >= 30:
needed_tasks = 2
elif cpu_diff >= 20:
needed_tasks = 1
elif cpu_diff >= 10:
needed_tasks = 1
else:
needed_tasks = 1
# Account for pending tasks
pending_tasks = service_details['pending']
actual_scale_amount = max(0, needed_tasks - pending_tasks)
if actual_scale_amount > 0:
new_count = current_desired + actual_scale_amount
self.scale_service(service_name, new_count, is_scale_out=True)
self.last_scale_out[service_name] = datetime.now()
scaling_action = f"Scale-out +{actual_scale_amount} (needed: {needed_tasks}, pending: {pending_tasks}) [{self.cpu_metric_mode.upper()}]"
else:
scaling_action = f"No scale needed (pending: {pending_tasks} >= needed: {needed_tasks})"
elif cpu_value > up_threshold and not self.can_scale_out(service_name):
scaling_action = "CPU Cooldown"
# Scale-in logic
elif cpu_value < down_threshold and current_desired > 1 and self.can_scale_in(service_name):
if cpu_value <= 10: # Emergency scale-in
new_count = 1
self.scale_service(service_name, new_count)
self.last_scale_in[service_name] = datetime.now()
scaling_action = f"Emergency scale to 1 task (CPU: {cpu_value:.1f}%) [{self.cpu_metric_mode.upper()}]"
else: # Normal scale-in
new_count = max(1, current_desired - 1)
self.scale_service(service_name, new_count)
self.last_scale_in[service_name] = datetime.now()
scaling_action = f"Scale-in -1 task (CPU: {cpu_value:.1f}%) [{self.cpu_metric_mode.upper()}]"
elif cpu_value < down_threshold and not self.can_scale_in(service_name):
scaling_action = "Scale-in cooldown"
self.service_data[service_name] = {
'cpu_max': cpu_max,
'cpu_avg': cpu_avg,
'cpu_history': cpu_history,
'memory': memory_util,
'task_p90': task_p90,
'total_requests': total_requests,
'success_requests': success_requests,
'failure_requests': failure_requests,
'desired': current_desired,
'running': service_details['running'],
'pending': service_details['pending'],
'status': service_details['status'],
'scaling_action': scaling_action
}
# Check if ASG needs scaling after processing all services (every 60 seconds)
# Skip ASG scaling if scaling is disabled
current_time = datetime.now()
if (self.cpu_metric_mode != 'off' and
(not hasattr(self, 'last_asg_check') or (current_time - self.last_asg_check).total_seconds() >= 60)):
if self.check_asg_scale_needed():
self.scale_asg_up()
self.last_asg_check = current_time
live.update(self.create_display())
time.sleep(1)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='ECS CPU Autoscaler')
parser.add_argument('--cluster', required=True, help='ECS cluster name')
parser.add_argument('--region', default='ap-northeast-2', help='AWS region')
parser.add_argument('--log-group-prefix', help='Log group prefix (default: /aws/ecs/{cluster_name})')
args = parser.parse_args()
scaler = ECSAutoscaler(args.cluster, args.region, args.log_group_prefix)
scaler.run()