close

DEV Community

Oleksandr Kuryzhev
Oleksandr Kuryzhev

Posted on • Originally published at kuryzhev.cloud

Fix Untagged AWS Resources Automatically with Python and Boto3

Originally published on kuryzhev.cloud


If your AWS Cost Explorer has a fat "No Tag" line item and nobody on the team knows who owns those resources, your tagging strategy isn't a policy problem — it's an automation gap you can close in an afternoon. AWS resource tagging automation with Python and Boto3 is the fastest path from chaos to chargeback clarity. This runbook walks through diagnosing the gap, fixing existing drift, and making sure it never comes back.

Symptoms — Your AWS Bill Has Untagged Resources and You Don't Know Who Owns Them

The first sign is always the same: Cost Explorer shows a four-figure "No Tag" bucket and every team points at someone else. Here's what that situation looks like in concrete terms.

Cost Explorer's tag-based cost allocation report shows significant spend under the No Tag grouping for keys like Environment, Owner, and CostCenter. Finance wants a chargeback report. Engineering can't produce one. The argument goes in circles.

Meanwhile, AWS Config's required-tags managed rule is firing alerts — maybe dozens per day. But because there's no automated remediation wired up, those alerts turn into Jira tickets that sit in a backlog nobody prioritizes. The ticket count grows faster than the team can manually fix instances.

New resources keep arriving. Engineers spin up EC2 instances through the console at 11pm during an incident. A Terraform module someone copied from Stack Overflow doesn't include a tags block. A Lambda function gets created by a CI pipeline that was never updated to pass required tags. Each of these becomes a ghost resource — running, costing money, and owned by nobody according to your billing data.

The longer this runs, the worse the audit trail. By the time someone investigates, the IAM principal that created the resource has been recycled and the CloudTrail event is beyond your retention window. You're left guessing.

Root Cause — Why Tagging Enforcement Fails at Scale

Three structural failures cause untagged resources to accumulate. Fixing one without addressing the others just shifts the problem.

Tag policies in audit mode, not enforcement mode. AWS Organizations Tag Policies exist in most mature accounts — but they're almost always set to "audit" mode. Resources get created, the policy violation is logged, and nothing stops it. The policy runs after resource creation, not before. Audit mode generates visibility, not compliance.

Boto3 tagging scripts that run silently fail. Most teams have some version of a tagging script. It runs on a cron. It worked once. Then an IAM policy change broke it, or a new region was added and nobody updated the region list, or pagination was never implemented so it only tags the first 100 resources and stops. No one notices because there's no alerting on script exit codes and no audit log of what was actually tagged.

IAM permission gaps that produce silent no-ops. This one is subtle and painful. The Lambda or script role often has ec2:CreateTags but is missing tag:TagResources — the permission required by the unified resourcegroupstaggingapi. The error is: botocore.exceptions.ClientError: An error occurred (AccessDenied) when calling the TagResources operation. If that's not logged and alerted on, the function returns successfully from the Lambda handler's perspective while silently doing nothing. I've seen this run undetected for six weeks in a production account.

Fix #1 — Audit Existing Untagged Resources with a Boto3 Scanner

Before you fix anything, you need to know the full scope. This script uses the resourcegroupstaggingapi client — the correct unified API — to scan every resource across multiple regions concurrently and export a CSV of everything missing required tag keys.

Watch out for: get_resources() silently returns only 100 results per call if you don't paginate. This is the single most common bug in tagging scripts. The script below handles it correctly with a PaginationToken loop. Without it, you'll think you have 80 untagged resources when you actually have 800.

The scanner uses concurrent.futures.ThreadPoolExecutor with a max of 5 workers. Scanning 6 regions sequentially takes 8–12 minutes and generates roughly 2,000 API calls. With 5 concurrent workers it finishes in under 90 seconds and stays within free-tier API rate limits. Don't go higher than 5 workers — I tested 10 and hit ThrottlingException in accounts with large resource counts.

# tag_auditor.py — Boto3 scanner for untagged AWS resources
# Requires: boto3>=1.34.0, Python 3.11+
# Usage: python tag_auditor.py --regions us-east-1 eu-west-1 --output untagged.csv

import boto3
import csv
import argparse
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime

# Required tag keys — update to match your org's tag policy
REQUIRED_TAGS = {"Environment", "Owner", "CostCenter"}

def get_untagged_resources(region: str) -> list[dict]:
    """
    Scan a single region for resources missing any required tag key.
    Uses resourcegroupstaggingapi for unified multi-service coverage.
    """
    client = boto3.client("resourcegroupstaggingapi", region_name=region)
    untagged = []
    pagination_token = ""

    while True:
        kwargs = {
            "ResourcesPerPage": 100,  # max allowed per call
            "TagFilters": [],         # empty = return ALL resources
        }
        # Only pass PaginationToken if we have one — API rejects empty string
        if pagination_token:
            kwargs["PaginationToken"] = pagination_token

        try:
            response = client.get_resources(**kwargs)
        except client.exceptions.InvalidParameterException as e:
            print(f"[{region}] InvalidParameterException: {e}")
            break

        for resource in response.get("ResourceTagMappingList", []):
            arn = resource["ResourceARN"]
            existing_keys = {tag["Key"] for tag in resource.get("Tags", [])}
            missing_keys = REQUIRED_TAGS - existing_keys

            if missing_keys:
                untagged.append({
                    "arn": arn,
                    "region": region,
                    "missing_tags": ", ".join(sorted(missing_keys)),
                    "scanned_at": datetime.utcnow().isoformat(),
                })

        pagination_token = response.get("PaginationToken", "")
        if not pagination_token:
            break  # no more pages

    print(f"[{region}] Found {len(untagged)} untagged resources")
    return untagged


def scan_all_regions(regions: list[str]) -> list[dict]:
    """
    Run region scans concurrently — max 5 workers to avoid API throttling.
    """
    all_results = []
    with ThreadPoolExecutor(max_workers=5) as executor:
        futures = {executor.submit(get_untagged_resources, r): r for r in regions}
        for future in as_completed(futures):
            try:
                all_results.extend(future.result())
            except Exception as e:
                print(f"Error scanning region {futures[future]}: {e}")
    return all_results


def write_csv(results: list[dict], output_path: str) -> None:
    if not results:
        print("No untagged resources found.")
        return
    with open(output_path, "w", newline="") as f:
        writer = csv.DictWriter(f, fieldnames=["arn", "region", "missing_tags", "scanned_at"])
        writer.writeheader()
        writer.writerows(results)
    print(f"Report written to {output_path} — {len(results)} resources need tagging")


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Audit untagged AWS resources")
    parser.add_argument("--regions", nargs="+", default=["us-east-1"], help="AWS regions to scan")
    parser.add_argument("--output", default="untagged_resources.csv", help="Output CSV path")
    args = parser.parse_args()

    results = scan_all_regions(args.regions)
    write_csv(results, args.output)

Run this, push the CSV to S3, and share it with the team. Now everyone sees the actual scope. That number usually ends the debate about whether this is worth fixing.

One more gotcha: tag key names are case-sensitive. environment and Environment are two different keys in AWS. Standardize your required key names in a constants file — not a wiki page — and import that file everywhere. If it lives only in documentation, it will drift.

Fix #2 — Auto-Tag Resources on Creation with an EventBridge + Lambda Pipeline

The audit tells you about the past. This fix handles the future. The pattern: EventBridge listens for CloudTrail RunInstances, CreateDBInstance, and CreateFunction events, then triggers a Lambda that applies ownership tags immediately at creation time.

The Lambda extracts the userIdentity block from the CloudTrail event detail — this gives you the IAM principal ARN of whoever or whatever created the resource. That becomes the Owner tag. No more guessing.

Critical prerequisite: CloudTrail must be enabled with Include management events = Write. A read-only trail will not capture RunInstances. Also — and this catches people constantly — if your CloudTrail is single-region and your EventBridge rule is in us-east-1, resources created in eu-west-1 will never trigger the Lambda. Use a multi-region trail. See the AWS CloudTrail multi-region documentation for setup details.

The Lambda execution role minimum permissions: ec2:DescribeInstances, tag:TagResources, logs:CreateLogGroup, logs:PutLogEvents. Nothing more. Scope the tag:TagResources with aws:ResourceTag condition keys — an unrestricted tag:TagResources on * lets this function overwrite security-sensitive tags like data-classification or backup-policy. That's a privilege escalation vector you don't want.

# lambda_auto_tagger.py — EventBridge-triggered Lambda for tagging resources on creation
# Deploy as Lambda function; trigger via EventBridge rule on CloudTrail RunInstances events
# Runtime: Python 3.12 | Memory: 128 MB | Timeout: 30s

import boto3
import logging
import os

logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Default fallback tags applied when owner cannot be determined from event
DEFAULT_TAGS = {
    "Environment": os.environ.get("DEFAULT_ENV", "unknown"),
    "CostCenter":  os.environ.get("DEFAULT_COST_CENTER", "unassigned"),
}

def extract_owner(event_detail: dict) -> str:
    """Pull principal identity from CloudTrail userIdentity block."""
    identity = event_detail.get("userIdentity", {})
    # Prefer assumed-role ARN; fall back to IAM user ARN; then 'unknown'
    return (
        identity.get("arn")
        or identity.get("userName", "unknown")
    )

def build_resource_arns(event_detail: dict, region: str, account_id: str) -> list[str]:
    """Extract EC2 instance ARNs from RunInstances response items."""
    arns = []
    items = event_detail.get("responseElements", {}).get("instancesSet", {}).get("items", [])
    for item in items:
        instance_id = item.get("instanceId")
        if instance_id:
            # Full ARN required by tag_resources() — instance ID alone causes InvalidResourceId
            arns.append(f"arn:aws:ec2:{region}:{account_id}:instance/{instance_id}")
    return arns

def handler(event, context):
    detail      = event.get("detail", {})
    region      = event.get("region", "us-east-1")
    account_id  = event.get("account", "")
    owner       = extract_owner(detail)
    arns        = build_resource_arns(detail, region, account_id)

    if not arns:
        logger.warning("No resource ARNs extracted from event — skipping")
        return {"status": "skipped", "reason": "no_arns"}

    tags = {**DEFAULT_TAGS, "Owner": owner}
    tagging_client = boto3.client("resourcegroupstaggingapi", region_name=region)

    # tag_resources() accepts max 20 ARNs per call — batch accordingly
    for i in range(0, len(arns), 20):
        batch = arns[i:i+20]
        try:
            resp = tagging_client.tag_resources(ResourceARNList=batch, Tags=tags)
            failed = resp.get("FailedResourcesMap", {})
            if failed:
                logger.error(f"Failed to tag resources: {failed}")
        except tagging_client.exceptions.InvalidParameterException as e:
            # Resource may be in terminal state (deleting) — log and continue
            logger.warning(f"InvalidParameterException for batch {batch}: {e}")

    logger.info(f"Tagged {len(arns)} instance(s) with Owner={owner}")
    return {"status": "ok", "tagged_count": len(arns)}

I stopped using service-specific tag APIs (ec2:create_tags, lambda:tag_resource) after spending two hours debugging why RDS instances weren't getting tagged when EC2 instances were. The unified resourcegroupstaggingapi handles EC2, RDS, Lambda, S3, and more in a single client. It's roughly 60% fewer API calls in multi-service environments. Use it exclusively.

Fix #3 — Enforce Tag Compliance with AWS Config + Boto3 Auto-Remediation

EventBridge catches resources at creation. But things slip through — manual console actions, API calls from services that don't emit the events you're watching, or resources created before you deployed the EventBridge rule. AWS Config is your safety net.

Deploy the required-tags managed Config rule scoped to AWS::EC2::Instance, AWS::RDS::DBInstance, and AWS::Lambda::Function. The rule supports up to 6 tag key/value pairs per deployment. Set evaluation frequency to 24 hours — continuous evaluation generates excessive Config API costs for large accounts.

Wire the AWS-TagResource SSM Automation document as the remediation action. Pass default tag values (Environment=unknown, Owner=unassigned) as remediation parameters. This keeps your cost reports clean — a resource tagged Owner=unassigned shows up in filtered reports and triggers follow-up. A resource with no tag at all is invisible.

Watch out for this one: The SSM remediation role needs both config:StartRemediationExecution AND tag:TagResources. Missing the second permission produces a silent no-op. Config shows the remediation as "in progress" indefinitely. There's no error surfaced in the Config console. You'll only catch it by checking CloudWatch Logs for the SSM automation execution. I've seen teams run this misconfigured for months thinking remediation was working.

Also: the AWS-TagResource document requires the full resource ARN in the format arn:aws:ec2:REGION:ACCOUNT:instance/i-XXXXXXXXX. Passing an instance ID alone raises InvalidResourceId. Config passes the ARN correctly if you use the RESOURCE_ID parameter mapping — double-check your parameter bindings in the remediation configuration. See the AWS Config remediation documentation for the full parameter reference.

Prevention — Enforce Tagging at the IaC Layer Before Resources Are Created

Remediation is reactive. Prevention is better. The goal is to make it impossible to deploy an untagged resource through your standard pipelines.

Add a check_required_tags() pre-flight validation function to every Boto3 deployment script. Call it before any create_* API call. If required tags are missing from the payload, fail fast with a clear error message: DeploymentError: Missing required tags: ['CostCenter', 'Owner']. Add them to your deployment config and retry. A clear error at deploy time is infinitely better than a mystery line item in next month's bill.

At the organization level, combine AWS Organizations Tag Policies with Service Control Policies. Set EnforcedFor on specific resource types in Tag Policies, and add an SCP that denies ec2:RunInstances unless the aws:RequestTag/CostCenter condition is present. This hard-blocks non-compliant creates at the API layer — no Lambda, no Config rule, no cron job required. It's the most reliable enforcement mechanism available.

Finally, set up a weekly Cost Explorer report filtered by tag:CostCenter = N/A and alert via SNS if spend exceeds $50. Cost Explorer tag data has a 24-hour activation lag after first application — newly tagged resources won't appear in filtered reports until the next day, so don't panic if your first run still shows spend. This financial early-warning gives you a business-level signal before the next billing cycle closes, and it's the kind of alert that actually gets acted on because it has a dollar amount attached.

AWS resource tagging automation with Python is a weekend project that pays for itself in the first billing cycle. Audit what you have, stop new drift at the creation event, catch stragglers with Config, and enforce at the IaC layer. Stack all four layers and the "No Tag" line item disappears. For more automation patterns on this site, see kuryzhev.cloud.

Related

Top comments (0)