Custom Check Examples¶
Real-world examples of custom validation checks.
Encryption Required¶
Require encryption for S3 write operations:
from typing import ClassVar
from iam_validator.core.check_registry import PolicyCheck, CheckConfig
from iam_validator.core.aws_service import AWSServiceFetcher
from iam_validator.core.models import Statement, ValidationIssue
class EncryptionRequiredCheck(PolicyCheck):
"""Ensures S3 write operations require encryption."""
check_id: ClassVar[str] = "s3_encryption_required"
description: ClassVar[str] = "Ensures S3 writes require encryption"
default_severity: ClassVar[str] = "high"
S3_WRITE_ACTIONS = {"s3:PutObject", "s3:PutObjectAcl", "s3:ReplicateObject"}
async def execute(
self,
statement: Statement,
statement_idx: int,
fetcher: AWSServiceFetcher,
config: CheckConfig,
) -> list[ValidationIssue]:
issues = []
if statement.effect != "Allow":
return issues
actions = statement.get_actions()
for action in actions:
if action in self.S3_WRITE_ACTIONS:
if not self._has_encryption_condition(statement):
issues.append(
ValidationIssue(
severity=self.get_severity(config),
statement_sid=statement.sid,
statement_index=statement_idx,
issue_type="missing_encryption",
message=f"S3 write '{action}' requires encryption",
action=action,
suggestion="Add s3:x-amz-server-side-encryption condition",
line_number=statement.line_number,
)
)
return issues
def _has_encryption_condition(self, statement: Statement) -> bool:
if not statement.condition:
return False
for operator, conditions in statement.condition.items():
if "s3:x-amz-server-side-encryption" in conditions:
return True
return False
Production Wildcard Block¶
Block wildcards in production resources:
from typing import ClassVar
from iam_validator.core.check_registry import PolicyCheck, CheckConfig
from iam_validator.core.aws_service import AWSServiceFetcher
from iam_validator.core.models import Statement, ValidationIssue
class ProductionWildcardCheck(PolicyCheck):
"""Blocks wildcards in production resources."""
check_id: ClassVar[str] = "production_wildcard"
description: ClassVar[str] = "Blocks wildcards in production"
default_severity: ClassVar[str] = "critical"
async def execute(
self,
statement: Statement,
statement_idx: int,
fetcher: AWSServiceFetcher,
config: CheckConfig,
) -> list[ValidationIssue]:
issues = []
resources = statement.get_resources()
for resource in resources:
if "production" in resource.lower() and "*" in resource:
issues.append(
ValidationIssue(
severity=self.get_severity(config),
statement_sid=statement.sid,
statement_index=statement_idx,
issue_type="production_wildcard",
message=f"Production resource has wildcard: {resource}",
resource=resource,
suggestion="Use specific resource identifiers",
line_number=statement.line_number,
)
)
return issues
Region Restriction¶
Enforce approved AWS regions:
from typing import ClassVar
from iam_validator.core.check_registry import PolicyCheck, CheckConfig
from iam_validator.core.aws_service import AWSServiceFetcher
from iam_validator.core.models import Statement, ValidationIssue
class RegionRestrictionCheck(PolicyCheck):
"""Enforces approved AWS regions."""
check_id: ClassVar[str] = "region_restriction"
description: ClassVar[str] = "Enforces approved regions"
default_severity: ClassVar[str] = "medium"
async def execute(
self,
statement: Statement,
statement_idx: int,
fetcher: AWSServiceFetcher,
config: CheckConfig,
) -> list[ValidationIssue]:
issues = []
approved_regions = set(config.config.get("approved_regions", []))
if not approved_regions:
return issues
# Check if region condition exists
if not self._has_region_condition(statement, approved_regions):
issues.append(
ValidationIssue(
severity=self.get_severity(config),
statement_sid=statement.sid,
statement_index=statement_idx,
issue_type="missing_region_restriction",
message="Statement missing region restriction",
suggestion=f"Add aws:RequestedRegion condition: {approved_regions}",
line_number=statement.line_number,
)
)
return issues
def _has_region_condition(self, statement: Statement, approved: set) -> bool:
if not statement.condition:
return False
for operator, conditions in statement.condition.items():
if "aws:RequestedRegion" in conditions:
return True
return False
Configuration:
checks:
region_restriction:
enabled: true
approved_regions:
- us-east-1
- us-west-2
- eu-west-1
More Examples¶
See the examples/custom_checks/ directory for additional examples:
| Check | Description |
|---|---|
domain_restriction_check.py |
Restrict S3 access to domains |
tag_enforcement_check.py |
Enforce resource tagging |
time_based_access_check.py |
Business hours restrictions |
cross_account_external_id_check.py |
Confused deputy prevention |