diff --git a/patterns/leaktk/1/opa_policy.rego b/patterns/leaktk/1/opa_policy.rego index e8b94954..8f734406 100644 --- a/patterns/leaktk/1/opa_policy.rego +++ b/patterns/leaktk/1/opa_policy.rego @@ -22,6 +22,23 @@ auth_bearer_token_valid(opts) if { }).status_code < 300 } +default valid_status_code(status_code, valid_range, invalid_range) := null + +valid_status_code(status_code, valid_range, invalid_range) := false if { + status_code >= invalid_range[0] + status_code <= invalid_range[1] +} + +valid_status_code(status_code, valid_range, invalid_range) := true if { + status_code >= valid_range[0] + status_code <= valid_range[1] +} + +# Helper for accessing a field if a condition is true +default get_if(object, field, condition) := null + +get_if(object, field, condition) := object[field] if condition + container_registry_auth_opts(hostname) := opts if { lower(hostname) == "docker.io" opts := { @@ -184,3 +201,62 @@ analyzed_findings contains analyzed_finding if { "token": finding.secret, })}) } + +# OpenAI API Keys +analyzed_findings contains analyzed_finding if { + some finding in findings + contains(lower(finding.rule.description), "openai") + regex.match(`^sk-[\w\-]{16,}T3BlbkFJ[\w\-]{16,}$`, finding.secret) + resp := http.send({ + "url": "https://api.openai.com/v1/models", + "method": "GET", + "headers": {"Authorization": sprintf("Bearer %s", [finding.secret])}, + }) + analyzed_finding := object.union(finding, { + "valid": resp.status_code == 200, + "analysis": {"status_code": resp.status_code}, + }) +} + +# Groq API Keys +analyzed_findings contains analyzed_finding if { + some finding in findings + contains(lower(finding.rule.description), "groq") + regex.match(`^gsk_[A-Za-z0-9]{52}$`, finding.secret) + resp := http.send({ + "url": "https://api.groq.com/openai/v1/models", + "method": "GET", + "headers": {"Authorization": sprintf("Bearer %s", [finding.secret])}, + }) + analyzed_finding := object.union(finding, { + "valid": valid_status_code(resp.status_code, [200, 200], [400, 499]), + "analysis": {"status_code": resp.status_code}, + }) +} + +# GitLab Personal Access Tokens +analyzed_findings contains analyzed_finding if { + some finding in findings + contains(lower(finding.rule.description), "gitlab") + regex.match(`^glpat-[\w\-]{20}$|^glpat-[\w\-]{32,235}\.[0-9a-z]{2}\.[0-9a-z]{9}$`, finding.secret) + + user_resp := http.send({ + "url": "https://gitlab.com/api/v4/user", + "method": "GET", + "headers": {"Authorization": sprintf("Bearer %s", [finding.secret])}, + }) + + token_resp := http.send({ + "url": "https://gitlab.com/api/v4/personal_access_tokens/self", + "method": "GET", + "headers": {"Authorization": sprintf("Bearer %s", [finding.secret])}, + }) + + analyzed_finding := object.union(finding, { + "valid": user_resp.status_code == 200, + "analysis": { + "user": get_if(user_resp, "body", user_resp.status_code == 200), + "token": get_if(token_resp, "body", token_resp.status_code == 200), + }, + }) +} \ No newline at end of file