<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>Forem: Hussein Ayoub</title>
    <description>The latest articles on Forem by Hussein Ayoub (@husseinayoub).</description>
    <link>https://forem.com/husseinayoub</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F2921236%2F9071af22-ccfa-4925-9dd8-42e12c580543.jpg</url>
      <title>Forem: Hussein Ayoub</title>
      <link>https://forem.com/husseinayoub</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://forem.com/feed/husseinayoub"/>
    <language>en</language>
    <item>
      <title>Automating AWS WAF IP Threat list with ProxyCheck.io</title>
      <dc:creator>Hussein Ayoub</dc:creator>
      <pubDate>Thu, 11 Sep 2025 20:14:06 +0000</pubDate>
      <link>https://forem.com/husseinayoub/automating-aws-waf-ip-threat-list-with-proxycheckio-37bf</link>
      <guid>https://forem.com/husseinayoub/automating-aws-waf-ip-threat-list-with-proxycheckio-37bf</guid>
      <description>&lt;p&gt;In this post, I'll automate the process of adding malicious IP addresses classified by the following third-party service &lt;a href="https://proxycheck.io" rel="noopener noreferrer"&gt;https://proxycheck.io&lt;/a&gt; to AWS WAF.&lt;/p&gt;

&lt;p&gt;AWS has already &lt;a href="https://docs.aws.amazon.com/waf/latest/developerguide/aws-managed-rule-groups-ip-rep.html" rel="noopener noreferrer"&gt;built-in managed lists&lt;/a&gt; you can leverage to protect your resources against malicious IP addresses used for Recon, DoS, or with a general bad reputation.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;AWSManagedIPReputationList&lt;/li&gt;
&lt;li&gt;AWSManagedReconnaissanceList&lt;/li&gt;
&lt;li&gt;AWSManagedIPDDoSList&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;But we're building our own list based on web server access logs, specifically Cloudfront.&lt;/p&gt;

&lt;h2&gt;
  
  
  Prerequisites
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;An API key obtained from &lt;a href="https://proxycheck.io/" rel="noopener noreferrer"&gt;https://proxycheck.io/&lt;/a&gt; ( they have a generous free plan )&lt;/li&gt;
&lt;li&gt;You need the necessary IAM permissions to deploy an AWS SAM application and create the needed underlying resources.&lt;/li&gt;
&lt;li&gt;An Elasticache Valkey cluster for caching our API call results from ProxyCheck for quota efficiency.&lt;/li&gt;
&lt;li&gt;A telegram bot created so you can receive notifications about WAF updates&lt;/li&gt;
&lt;li&gt;A SAM app initialized with &lt;a href="https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/install-sam-cli.html" rel="noopener noreferrer"&gt;AWS SAM CLI&lt;/a&gt;
&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Walkthrough
&lt;/h2&gt;

&lt;p&gt;We will create a new AWS SAM template file and include the following block of YAML definition to define:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;A Lambda function running our Python script that interacts with ProxyCheck API, stores results in Elasticache Valkey along with the risk score associated with the scanned IP address, and updates the AWS WAF IP set accordingly.&lt;/li&gt;
&lt;li&gt;An EventBridge Rule that will act as an event listener to trigger the lambda function whenever a new CSV file is written to our S3 bucket that contains our exported list of IP addresses.&lt;/li&gt;
&lt;li&gt;The necessary IAM permissions attached to the lambda function to read files from S3 and update the WAF IP Set.
&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: &amp;gt;
  ProxyCheck WAF automation

  ProxyCheck WAF automation Lambda Function that reads IPs from S3, checks risk scores using ProxyCheck.io,
  and updates AWS WAF IP sets with high-risk IPs.

# More info about Globals: https://github.com/awslabs/serverless-application-model/blob/master/docs/globals.rst
Globals:
  Function:
    Timeout: 900  # 15 minutes
    MemorySize: 512
    LoggingConfig:
      LogFormat: JSON

Resources:
  ProxyCheckWAFAutomation:
    Type: AWS::Serverless::Function
    Properties:
      FunctionName: "ProxyCheckWAFAutomation"
      CodeUri: proxy_check_waf_automation/
      Handler: app.lambda_handler
      Runtime: python3.13
      Architectures:
        - x86_64
      Timeout: 900  # 15 minutes
      MemorySize: 512
      Environment:
        Variables:
          S3_BUCKET_NAME: "proxycheck-waf-automation"
          PROXYCHECK_API_KEY: "xxxx-xxxxx-xxxxx-xxxxx"
          TELEGRAM_BOT_TOKEN: "YOUR_TG_BOT_TOKEN"
          TELEGRAM_CHAT_ID: "-xxxxx"
          REDIS_URL: "redis://username:password@hostname/db_name"
      Events:
        Trigger:
          Type: EventBridgeRule
          Properties:
            Pattern:
              source:
                - "aws.s3"
              detail-type:
                - "Object Created"
                - "Object Updated"
              detail:
                bucket: 
                  name:
                    - "proxycheck-waf-automation"
      Policies:
        - S3ReadPolicy:
            BucketName: "proxycheck-waf-automation/*"
        - Statement:
            - Effect: Allow
              Action:
                - wafv2:GetIPSet
                - wafv2:UpdateIPSet
              Resource: "*"
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;blockquote&gt;
&lt;p&gt;You need to update the environment variables values to match your S3 bucket name, proxycheck.io API key and telegram bot token and &lt;a href="https://stackoverflow.com/questions/32423837/telegram-bot-how-to-get-a-group-chat-id" rel="noopener noreferrer"&gt;chat  ID&lt;/a&gt;. Please use AWS Systems Manager Parameter store to store your secrets. The above template is for demo purposes only.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;Below is the code needed in your Python script:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;import json
import csv
import os
import io
import logging
from typing import List, Set, Optional
import boto3
import requests
import proxycheck
import botocore.exceptions
import sys
import redis
import time

# Set up logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Initialize AWS clients
s3_client = boto3.client('s3')
wafv2_client = boto3.client('wafv2', region_name='us-east-1')

# Initialize ProxyCheck client
proxy_checker = proxycheck.Blocking(key=os.environ.get('PROXYCHECK_API_KEY'))

# Initialize Redis client
REDIS_URL = os.environ.get('REDIS_URL')
redis_client = None

def get_redis_client():
    """Get Redis client with connection retry logic"""
    global redis_client
    if redis_client is None:
        try:
            redis_client = redis.from_url(
                REDIS_URL,
                socket_timeout=5,
                socket_connect_timeout=5,
                retry_on_timeout=True,
                health_check_interval=30
            )
            # Test connection
            redis_client.ping()
            logger.info("Redis connection established successfully")
        except Exception as e:
            logger.warning(f"Failed to connect to Redis: {str(e)}. Will proceed without caching.")
            redis_client = None
    return redis_client


def lambda_handler(event, context):
    """Daily IP Checker Lambda Function

    This function:
    1. Reads a CSV file from S3 containing IPs
    2. Deduplicates the IPs
    3. Checks risk scores using ProxyCheck.io (limited to 1000 requests)
    4. Adds high-risk IPs (score &amp;gt; 33) to AWS WAF IP set
    5. Sends Telegram notification about the update
    """

    try:
        # Get configuration from environment variables
        s3_bucket = os.environ.get('S3_BUCKET_NAME')
        s3_key = event["detail"]["object"]["key"]
        waf_scope =  'CLOUDFRONT'
        waf_ip_set_name = "ProxyCheckAutomation"
        waf_ip_set_id = "your_id_here"
        telegram_bot_token = os.environ.get('TELEGRAM_BOT_TOKEN')
        telegram_chat_id = os.environ.get('TELEGRAM_CHAT_ID')

        # Validate required environment variables
        required_vars = [s3_bucket, s3_key, waf_ip_set_name, waf_ip_set_id]
        if not all(required_vars):
            raise ValueError("Missing required environment variables")

        logger.info(f"Starting daily IP check process")

        # Step 1: Read CSV file from S3
        logger.info(f"Reading CSV file from S3: s3://{s3_bucket}/{s3_key}")
        ips = read_ips_from_s3(s3_bucket, s3_key)
        logger.info(f"Found {len(ips)} unique IPs after deduplication")

        # Step 2: Check IP risk scores (limited to 1000 requests)
        logger.info("Checking IP risk scores with ProxyCheck.io and Redis caching")
        risky_ips = check_ip_risk_scores(ips[:1000])
        logger.info(f"Found {len(risky_ips)} risky IPs (risk score &amp;gt; 33)")

        # Step 3: Add risky IPs to AWS WAF IP set
        added_count = 0
        if risky_ips:
            logger.info("Adding risky IPs to AWS WAF IP set")
            # Convert IPs to CIDR format
            networks = {f"{ip}/32" for ip in risky_ips}
            added_count = update_single_ipset(wafv2_client, waf_ip_set_name, waf_scope, waf_ip_set_id, networks)
            logger.info(f"Added {added_count} IPs to WAF IP set")
        else:
            logger.info("No risky IPs found, no updates needed")

        # Step 4: Send Telegram notification with cache stats
        if telegram_bot_token and telegram_chat_id:
            # Get cache efficiency stats for notification
            cache_stats = get_cache_stats()
            send_telegram_notification(
                telegram_bot_token, 
                telegram_chat_id, 
                len(ips), 
                len(risky_ips), 
                added_count,
                context,
                cache_stats
            )

        return {
            "statusCode": 200,
            "body": json.dumps({
                "message": "Daily IP check completed successfully",
                "total_ips_processed": len(ips),
                "risky_ips_found": len(risky_ips),
                "ips_added_to_waf": added_count if risky_ips else 0
            })
        }

    except Exception as e:
        logger.error(f"Error in daily IP check: {str(e)}")

        # Send error notification via Telegram if configured
        telegram_bot_token = os.environ.get('TELEGRAM_BOT_TOKEN')
        telegram_chat_id = os.environ.get('TELEGRAM_CHAT_ID')
        if telegram_bot_token and telegram_chat_id:
            send_error_notification(telegram_bot_token, telegram_chat_id, str(e), context)

        raise e


def read_ips_from_s3(bucket: str, key: str) -&amp;gt; List[str]:
    """Read and deduplicate IPs from CSV file in S3"""
    try:
        # Download the CSV file from S3
        response = s3_client.get_object(Bucket=bucket, Key=key)
        csv_content = response['Body'].read().decode('utf-8')

        # Parse CSV and extract IPs
        ips: Set[str] = set()
        csv_reader = csv.reader(io.StringIO(csv_content))

        for row in csv_reader:
            # Skip empty rows
            if not row or len(row) &amp;lt; 2:
                continue

            # IP is in the second column (index 1)
            ip_value = row[1].strip()

            if ip_value and is_valid_ip(ip_value):
                ips.add(ip_value)

        return list(ips)

    except Exception as e:
        logger.error(f"Error reading IPs from S3: {str(e)}")
        raise


def is_valid_ip(ip: str) -&amp;gt; bool:
    """Basic IP validation"""
    try:
        parts = ip.split('.')
        return len(parts) == 4 and all(0 &amp;lt;= int(part) &amp;lt;= 255 for part in parts)
    except (ValueError, AttributeError):
        return False


def get_cached_risk_score(ip: str) -&amp;gt; Optional[int]:
    """Get cached risk score from Redis"""
    try:
        redis_conn = get_redis_client()
        if redis_conn is None:
            return None

        cache_key = f"ip_risk:{ip}"
        cached_score = redis_conn.get(cache_key)

        if cached_score is not None:
            return int(cached_score.decode('utf-8'))
        return None
    except Exception as e:
        logger.warning(f"Error reading from Redis cache for IP {ip}: {str(e)}")
        return None


def cache_risk_score(ip: str, risk_score: int, ttl_hours: int = 720):
    """Cache risk score in Redis with TTL"""
    try:
        redis_conn = get_redis_client()
        if redis_conn is None:
            return

        cache_key = f"ip_risk:{ip}"
        redis_conn.setex(cache_key, ttl_hours * 3600, str(risk_score))
        logger.debug(f"Cached risk score for IP {ip}: {risk_score}")
    except Exception as e:
        logger.warning(f"Error caching risk score for IP {ip}: {str(e)}")


# Global cache stats for tracking
cache_stats = {"cache_hits": 0, "api_calls": 0, "checked_count": 0}

def get_cache_stats():
    """Get current cache statistics"""
    return cache_stats.copy()

def reset_cache_stats():
    """Reset cache statistics"""
    global cache_stats
    cache_stats = {"cache_hits": 0, "api_calls": 0, "checked_count": 0}

def check_ip_risk_scores(ips: List[str]) -&amp;gt; List[str]:
    """Check IP risk scores using ProxyCheck.io with Redis caching and return risky IPs"""
    risky_ips = []
    reset_cache_stats()  # Reset stats at the beginning of each run

    try:
        logger.info(f"Starting to check {len(ips)} IPs with Redis caching")

        for ip in ips:
            try:
                # First, check if we have a cached result
                cached_risk = get_cached_risk_score(ip)

                if cached_risk is not None:
                    risk_score = cached_risk
                    cache_stats["cache_hits"] += 1
                    logger.debug(f"Cache hit for IP {ip}: risk score {risk_score}")
                else:
                    # Check individual IP using proxy_checker.ip() method
                    ip_result = proxy_checker.ip(ip)
                    risk_score = ip_result.risk()
                    cache_stats["api_calls"] += 1

                    # Cache the result for future use
                    cache_risk_score(ip, risk_score)
                    logger.debug(f"API call for IP {ip}: risk score {risk_score}")

                cache_stats["checked_count"] += 1

                if risk_score &amp;gt; 33:
                    risky_ips.append(ip)
                    logger.info(f"Risky IP found: {ip} (risk score: {risk_score})")

                # Log progress every 100 IPs
                if cache_stats["checked_count"] % 100 == 0:
                    logger.info(f"Progress: {cache_stats['checked_count']}/{len(ips)} IPs processed, {len(risky_ips)} risky IPs found. Cache hits: {cache_stats['cache_hits']}, API calls: {cache_stats['api_calls']}")

            except Exception as e:
                logger.warning(f"Error checking IP {ip}: {str(e)}")
                continue

    except Exception as e:
        logger.error(f"Error checking IP risk scores: {str(e)}")
        raise

    cache_efficiency = (cache_stats["cache_hits"] / cache_stats["checked_count"] * 100) if cache_stats["checked_count"] &amp;gt; 0 else 0
    logger.info(f"Completed checking {cache_stats['checked_count']} IPs, found {len(risky_ips)} risky IPs. Cache efficiency: {cache_stats['cache_hits']}/{cache_stats['checked_count']} ({cache_efficiency:.1f}% cache hits), API calls saved: {cache_stats['cache_hits']}")
    return risky_ips


def get_ipset_and_token(client, name, scope, ipset_id):
    """
    Retrieve an existing WAFv2 IPSet.
    Returns (lock_token, description, existing_addresses_set).
    """
    try:
        resp = client.get_ip_set(Name=name, Scope=scope, Id=ipset_id)
    except botocore.exceptions.ClientError as e:
        logger.error(f"Error fetching IPSet {name}: {e}")
        raise

    ipset = resp["IPSet"]
    lock_token = resp["LockToken"]
    description = ipset.get("Description", "")
    existing = set(ipset.get("Addresses", []))
    return lock_token, description, existing


def update_single_ipset(client, name, scope, ipset_id, networks, is_ipv6=False):
    """
    Add any networks in `networks` to the specified IPSet.
    `networks` is a set of CIDR network strings (e.g., '192.168.1.1/32').
    Returns the number of new IPs added.
    """
    try:
        # 1) Fetch existing
        lock_token, description, existing_addrs = get_ipset_and_token(
            client, name, scope, ipset_id
        )

        # 2) Compute new
        to_add = networks - existing_addrs
        if not to_add:
            logger.info(f"[{name}] no new {'IPv6' if is_ipv6 else 'IPv4'} CIDRs to add.")
            return 0

        # 3) Merge and sort
        updated = sorted(existing_addrs | networks)
        limit = 10000
        if len(updated) &amp;gt; limit:
            logger.error(f"[{name}] total addresses {len(updated)} exceed limit {limit}.")
            raise ValueError(f"IPSet would exceed limit of {limit} addresses")

        # 4) Update IPSet
        try:
            resp = client.update_ip_set(
                Name=name,
                Scope=scope,
                Id=ipset_id,
                Description=description or 'Daily IP Risk Checker - High Risk IPs',
                Addresses=list(updated),
                LockToken=lock_token
            )
        except botocore.exceptions.ClientError as e:
            logger.error(f"Error updating IPSet {name}: {e}")
            raise

        next_token = resp.get("NextLockToken")
        logger.info(
            f"[{name}] Inserted {len(to_add)} new items; now has {len(updated)} addresses. "
            f"NextLockToken: {next_token}"
        )
        return len(to_add)

    except Exception as e:
        logger.error(f"Error updating IPSet {name}: {str(e)}")
        raise


def send_telegram_notification(bot_token: str, chat_id: str, total_ips: int, risky_ips: int, added_ips: int, context=None, cache_stats=None):
    """Send Telegram notification about the IP check results"""
    try:
        request_id = context.aws_request_id if context else 'Unknown'

        # Build cache efficiency message
        cache_message = ""
        if cache_stats and cache_stats.get("checked_count", 0) &amp;gt; 0:
            cache_efficiency = (cache_stats["cache_hits"] / cache_stats["checked_count"] * 100)
            cache_message = f"""
💾 Cache Performance:
   • Cache hits: {cache_stats["cache_hits"]}
   • API calls: {cache_stats["api_calls"]}  
   • Efficiency: {cache_efficiency:.1f}%
   • API calls saved: {cache_stats["cache_hits"]}"""

        message = f"""🛡️ Daily IP Check Report

📊 Total IPs processed: {total_ips}
⚠️ Risky IPs found: {risky_ips}
🔒 IPs added to WAF: {added_ips}{cache_message}

Status: {'✅ Completed successfully' if risky_ips &amp;gt;= 0 else '❌ Failed'}
Request ID: {request_id}
"""

        url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
        payload = {
            'chat_id': chat_id,
            'text': message,
            'parse_mode': 'HTML'
        }

        response = requests.post(url, json=payload, timeout=10)
        response.raise_for_status()
        logger.info("Telegram notification sent successfully")

    except Exception as e:
        logger.error(f"Error sending Telegram notification: {str(e)}")


def send_error_notification(bot_token: str, chat_id: str, error_message: str, context=None):
    """Send Telegram notification about errors"""
    try:
        request_id = context.aws_request_id if context else 'Unknown'
        message = f"""❌ Daily IP Check Error

Error: {error_message}
Request ID: {request_id}

Please check CloudWatch logs for more details.
"""

        url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
        payload = {
            'chat_id': chat_id,
            'text': message,
            'parse_mode': 'HTML'
        }

        response = requests.post(url, json=payload, timeout=10)
        response.raise_for_status()

    except Exception as e:
        logger.error(f"Error sending error notification: {str(e)}")
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The above code:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Reads a CSV file where IP addresses are stored in the second column when the file is uploaded to the following bucket &lt;strong&gt;proxycheck-waf-automation&lt;/strong&gt;, and de-duplicates them.&lt;/li&gt;
&lt;li&gt;Checks their risk scores with &lt;a href="https://proxycheck.io/examples/" rel="noopener noreferrer"&gt;ProxyCheck's client SDK &lt;/a&gt;. Once a risky IP is detected, it will be added to the WAF IP Set using the boto3 client.&lt;/li&gt;
&lt;li&gt;Caches scanned IP addresses in Elasticache Valkey along with their risk scores for 30 days to make sure we don't scan them again in this period.&lt;/li&gt;
&lt;li&gt;Sends a summary report to your Telegram chat.&lt;/li&gt;
&lt;/ul&gt;

&lt;blockquote&gt;
&lt;p&gt;You need to set your already created WAF IP Set ID in the code. In our case, we're updating the AWS WAF to protect Cloudfront distributions with an already existing rule that includes the IP Set to block IP addresses. The above example covers IPv4 addresses; you can apply the same principles to IPv6 addresses.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;And below are the results !&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fesyf3qkckn1qs0e1r7p8.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fesyf3qkckn1qs0e1r7p8.png" alt="AWS WAF ProxyCheck Automation" width="375" height="272"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Thanks for tuning in!&lt;/p&gt;

</description>
      <category>aws</category>
      <category>waf</category>
      <category>security</category>
      <category>cloud</category>
    </item>
    <item>
      <title>Automating On-demand GuardDuty EC2 malware scans</title>
      <dc:creator>Hussein Ayoub</dc:creator>
      <pubDate>Sat, 06 Sep 2025 19:26:47 +0000</pubDate>
      <link>https://forem.com/husseinayoub/automating-on-demand-guardduty-ec2-malware-scans-5029</link>
      <guid>https://forem.com/husseinayoub/automating-on-demand-guardduty-ec2-malware-scans-5029</guid>
      <description>&lt;p&gt;In this post, I'll automate the initiation of EC2 malware scans by GuardDuty, using a simple AWS SAM template.&lt;/p&gt;

&lt;h2&gt;
  
  
  Prerequisites
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;All EC2 instances to be scanned must be encrypted with AWS KMS CMK&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;In case you need to modify the KMS encryption key of your existing EBS volume, check out the following &lt;a href="https://repost.aws/knowledge-center/ebs-change-encryption-key" rel="noopener noreferrer"&gt;resource&lt;/a&gt; for more insights&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;You need the necessary IAM permissions to deploy an AWS SAM application&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Walkthrough
&lt;/h2&gt;

&lt;p&gt;We will create a new AWS SAM template file and include the following block of YAML definition to define our Lambda Function responsible of triggering the scans:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: &amp;gt;
  python3.13

Resources:
  EC2MalwareScan:
    Type: AWS::Serverless::Function
    Properties:
      FunctionName: ec2-malware-scan-weekly
      Description: Initiates GuardDuty on-demand malware scans for running EC2 instances
      PackageType: Zip
      Runtime: python3.13
      Handler: ec2_malware_scan_guardduty.ec2_malware_scan
      CodeUri: lambdas/infrastructure/ec2_malware_scan/
      Timeout: 60
      MemorySize: 256
      Tracing: Active
      LoggingConfig:
        LogFormat: JSON
      # CodeSigningConfigArn: !Ref CodeSigningConfig # Optional if you're using code signing already
      Architectures:
        - x86_64
      Events:
        WeeklySchedule:
          Type: ScheduleV2
          Properties:
            ScheduleExpression: 'cron(0 6 ? * MON *)'
            Name: WeeklyEC2MalwareScan
            Description: Weekly EC2 malware scan every Monday at 6 AM UTC
            State: ENABLED
            RetryPolicy:
              MaximumEventAgeInSeconds: 3600
              MaximumRetryAttempts: 2
      Policies:
        - Statement:
            - Sid: Ec2Describe
              Effect: Allow
              Action: ec2:DescribeInstances
              Resource: "*"
            - Sid: GuardDutyScanOnly
              Effect: Allow
              Action:
                - guardduty:ListDetectors
                - guardduty:GetDetector
                - guardduty:StartMalwareScan
              Resource: "*"
            - Sid: IAMPermissions
              Effect: Allow
              Action:
                - iam:GetRole
                - iam:PassRole
              Resource: "arn:aws:iam::*:role/aws-service-role/malware-protection.guardduty.amazonaws.com/AWSServiceRoleForAmazonGuardDutyMalwareProtection"
            - Sid: StsCaller
              Effect: Allow
              Action: sts:GetCallerIdentity
              Resource: "*"
      Environment:
        Variables:
          EXCLUDED_INSTANCES: ""
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The above template defines our lambda function running on Python 3.13. It has been scheduled to run on a weekly basis on Monday 6 AM UTC and with a retry policy for the EventBridge schedule to retry the execution of our Lambda.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;You can adjust the schedule of the runs according to your needs under the WeeklySchedule event&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;The IAM permissions attached to the lambda function include the following:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;EC2 Describe&lt;/strong&gt; – Allows the function to list running EC2 instances (&lt;code&gt;ec2:DescribeInstances&lt;/code&gt;).
&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;GuardDuty Malware Scan&lt;/strong&gt; – Grants access to list detectors, get detector details, and start on-demand malware scans (&lt;code&gt;guardduty:ListDetectors&lt;/code&gt;, &lt;code&gt;guardduty:GetDetector&lt;/code&gt;, &lt;code&gt;guardduty:StartMalwareScan&lt;/code&gt;).
&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;IAM Role Access&lt;/strong&gt; – Permits the function to read and pass the GuardDuty service-linked role required for malware protection (&lt;code&gt;iam:GetRole&lt;/code&gt;, &lt;code&gt;iam:PassRole&lt;/code&gt;).
&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;STS Identity Check&lt;/strong&gt; – Enables the function to retrieve its own AWS identity for logging and context (&lt;code&gt;sts:GetCallerIdentity&lt;/code&gt;).
&lt;/li&gt;
&lt;/ul&gt;




&lt;p&gt;Now, we need to create a Python file to store our code that will launch the automated scans, by leveraging boto3 to interact with GuardDuty.&lt;/p&gt;

&lt;p&gt;The file needs to be created under &lt;code&gt;lambdas/infrastructure/ec2_malware_scan/&lt;/code&gt; and should be named &lt;code&gt;ec2_malware_scan.py&lt;/code&gt; as we defined it in the SAM template &lt;code&gt;CodeUri: lambdas/infrastructure/ec2_malware_scan/&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;Below is the code needed in your Python script:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;import boto3
import json
import logging
import os
from typing import List, Dict, Any

# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Initialize AWS clients
ec2 = boto3.client('ec2')
guardduty = boto3.client('guardduty')
sts = boto3.client('sts')

def _get_detector_id() -&amp;gt; str:
    resp = guardduty.list_detectors()
    ids = resp.get('DetectorIds', [])
    if not ids:
        raise RuntimeError("No GuardDuty detectors found in this region")
    return ids[0]

def _malware_protection_enabled() -&amp;gt; bool:
    detector_id = _get_detector_id()
    det = guardduty.get_detector(DetectorId=detector_id)
    ebs = (
        det.get('DataSources', {})
           .get('MalwareProtection', {})
           .get('ScanEc2InstanceWithFindings', {})
           .get('EbsVolumes', {})
    )
    enabled = ebs.get('Status') == 'ENABLED'
    if enabled:
        logger.info("GuardDuty Malware Protection (EBS) is ENABLED")
    else:
        logger.warning("GuardDuty Malware Protection (EBS) is DISABLED")
    return enabled

def _running_instances() -&amp;gt; List[Dict[str, Any]]:
    account_id = sts.get_caller_identity()["Account"]
    region = ec2.meta.region_name
    paginator = ec2.get_paginator('describe_instances')
    pages = paginator.paginate(
        Filters=[
            {'Name': 'instance-state-name', 'Values': ['running']},
            {'Name': 'tag:Name', 'Values': ['*']}
        ]
    )
    instances: List[Dict[str, Any]] = []
    for page in pages:
        for r in page.get('Reservations', []):
            for inst in r.get('Instances', []):
                iid = inst['InstanceId']
                instances.append({
                    'InstanceId': iid,
                    'Arn': f"arn:aws:ec2:{region}:{account_id}:instance/{iid}",
                })

    logger.info(f"Found {len(instances)} running EC2 instances")
    return instances

def _start_scan(instance_arn: str) -&amp;gt; Dict[str, Any]:
    try:
        resp = guardduty.start_malware_scan(ResourceArn=instance_arn)
        return {'instance_arn': instance_arn, 'scan_id': resp['ScanId'], 'status': 'started'}
    except Exception as e:
        logger.error(f"Failed to start scan for {instance_arn}: {e}")
        return {'instance_arn': instance_arn, 'status': 'failed', 'error': str(e)}

def ec2_malware_scan(event, context):
    logger.info("Weekly GuardDuty EC2 malware scan kickoff")

    if not _malware_protection_enabled():
        return {
            'statusCode': 400,
            'body': json.dumps({
                'error': 'GuardDuty Malware Protection is not enabled',
                'message': 'Enable Malware Protection (EBS volumes) in GuardDuty'
            })
        }

    instances = _running_instances()
    # If you want to exclude instances from the scan, you can add their InstanceId to the EXCLUDED_INSTANCES environment variable
    excluded = {x.strip() for x in os.environ.get('EXCLUDED_INSTANCES', '').split(',') if x.strip()}
    to_scan = [i for i in instances if i['InstanceId'] not in excluded]

    logger.info(f"Excluded {len(instances) - len(to_scan)} instance(s); starting scans for {len(to_scan)}")

    results = [_start_scan(i['Arn']) for i in to_scan]

    return {
        'statusCode': 200,
        'body': json.dumps({
            'message': 'On-demand malware scans initiated',
            'instances_scanned': len(results),
            'instances_excluded': len(instances) - len(to_scan),
            'results': results
        })
    }

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;blockquote&gt;
&lt;p&gt;You can exclude instances from being scanned by adding their IDs to the EXCLUDED_INSTANCES environment variable.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;Triggering the lambda function manually will run the &lt;a href="https://eu-central-1.console.aws.amazon.com/guardduty/home?region=eu-central-1#/scans" rel="noopener noreferrer"&gt;scans&lt;/a&gt;, and you'll hopefully get no malware-infected instances :)&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fmgh34pgtj6c1r2h8crgg.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fmgh34pgtj6c1r2h8crgg.png" alt="AWS EC2 Malware Scanning with GuardDuty" width="800" height="293"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Thanks for tuning in!&lt;/p&gt;

</description>
      <category>aws</category>
      <category>guardduty</category>
      <category>ec2</category>
    </item>
  </channel>
</rss>
