Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/n0s1/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.2.1"
__version__ = "1.2.2"
21 changes: 17 additions & 4 deletions src/n0s1/controllers/spark1.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def __init__(self, headers: dict = None, server: str = None, options: dict[str,
get_server_info: bool = True, async_: bool = False, async_workers: int = 5,
max_retries: int = 3, timeout: int = None,
auth: tuple[str, str] = None):
self.base_url = "https://api.spark1.us"
self.base_url = "https://n0s1.spark1.us"
# self.base_url = "http://127.0.0.1:5000"
self.local_ip = _get_local_ip()
if server:
Expand Down Expand Up @@ -140,17 +140,30 @@ def is_connected(self, config=None):
return False
return False

def upload_report(self, report: dict):
if report is None:
return None
upload_report_url = self.base_url + "/api/v1/scans"
try:
# Upload the report
r = self._post_request(upload_report_url, json=report)
return r
except Exception as ex:
logging.info(str(ex))
return None


def ai_analysis(self, report=None, sensitive_report=None):
if report is None:
return None
auth_url = self.base_url + "/api/v1/analyses"
ai_analysis_url = self.base_url + "/api/v1/analyses"
updated_report = report

findings = report.get("findings", {})
for id, finding in findings.items():
try:
# AI agent to generate an HTTP request to validate the credential
r = self._post_request(auth_url, json=finding)
r = self._post_request(ai_analysis_url, json=finding)
if r.status_code == 200:
updated_finding = r.json()
req_validator = updated_finding.get("ai_report", {}).get("request_validator", {})
Expand All @@ -173,7 +186,7 @@ def ai_analysis(self, report=None, sensitive_report=None):
for id, finding in findings.items():
try:
# Submit the updated report findings with the HTTP responses so the AI agent can confirm which credentials were valid
r = self._post_request(auth_url, json=finding)
r = self._post_request(ai_analysis_url, json=finding)
if r.status_code == 200:
analyzed_finding = r.json()
analyzed_report["findings"][id] = analyzed_finding
Expand Down
8 changes: 4 additions & 4 deletions src/n0s1/n0s1.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,12 +397,12 @@ def main():
scope_config = secret_scanner.get_scope_config()
if scope_config:
if args.map_file:
scanner.log_message(f"Running scoped scan using map file [{args.map_file}]. Scan scope:", level=logging.INFO)
scanner.log_message(f"Running scoped scan using map file [{args.map_file}]. Scan scope:",
level=logging.INFO)
else:
scanner.log_message(f"Running scoped scan using search query:", level=logging.INFO)
pprint.pprint(scope_config)


report_file = args.report_file

command = args.command
Expand Down Expand Up @@ -430,7 +430,8 @@ def main():
secret_scanner.set(limit=limit)
secret_scanner.set(insecure=insecure)

commands = ["local_scan", "linear_scan", "slack_scan", "asana_scan", "zendesk_scan", "github_scan", "gitlab_scan", "wrike_scan", "jira_scan", "confluence_scan"]
commands = ["local_scan", "linear_scan", "slack_scan", "asana_scan", "zendesk_scan", "github_scan", "gitlab_scan",
"wrike_scan", "jira_scan", "confluence_scan"]
extended_commands = []
for c in commands:
short_c = c.replace("_scan", "")
Expand Down Expand Up @@ -529,6 +530,5 @@ def main():
scanner.log_message("Done!")



if __name__ == "__main__":
main()
37 changes: 31 additions & 6 deletions src/n0s1/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ def _setup_regex_config(self):
with open(self.regex_file, "r") as f:
extension = os.path.splitext(self.regex_file)[1]
if extension.lower() == ".yaml".lower():
self.regex_config = yaml.load(f, Loader=yaml.FullLoader)
self.regex_config = yaml.safe_load(f)
else:
self.regex_config = toml.load(f)
else:
Expand All @@ -368,7 +368,7 @@ def _setup_regex_config(self):
def _setup_cfg(self):
if os.path.exists(self.config_file):
with open(self.config_file, "r") as f:
self.cfg = yaml.load(f, Loader=yaml.FullLoader)
self.cfg = yaml.safe_load(f)
else:
self.log_message(f"Config file [{self.config_file}] not found!", level=logging.WARNING)

Expand Down Expand Up @@ -558,6 +558,8 @@ def scan(self):
n0s1_pro = spark1.Spark1(token_auth=N0S1_TOKEN)
if n0s1_pro.is_connected(self.scan_arguments):
mode = "professional"
else:
n0s1_pro = None
message = f"Starting scan in {mode} mode..."
self.log_message(message)

Expand Down Expand Up @@ -599,13 +601,36 @@ def scan(self):
for item_data in data:
if item_data and item_data.lower().find(label.lower()) == -1:
self.scan_text_and_report_leaks(item_data, name, self.regex_config, self.scan_arguments, ticket)
if n0s1_pro and self.ai_analysis:
ai_analyzed_report = n0s1_pro.ai_analysis(self.report_json, self.report_sensitive_json)
if ai_analyzed_report:
self.report_json = ai_analyzed_report
if n0s1_pro:
upload_http_response = n0s1_pro.upload_report(self.report_json)
self._process_report_upload(upload_http_response)
if self.ai_analysis:
ai_analyzed_report = n0s1_pro.ai_analysis(self.report_json, self.report_sensitive_json)
if ai_analyzed_report:
self.report_json = ai_analyzed_report

return self.report_json

def _process_report_upload(self, upload_http_response):
r = upload_http_response
if 200 <= r.status_code < 300:
scan_record = r.json()
report_uuid = scan_record.get("report_uuid", "")
self.report_json["uuid"] = report_uuid
message = f"Uploading report [{self.report_file}] to n0s1.spark1.us ..."
self.log_message(message)
message = f"Upload successful! Report UUID: [{report_uuid}]"
self.log_message(message)
else:
message = f"Unable to upload report to n0s1.spark1.us! HTTP response status: [{r.status_code}]."
self.log_message(message)
try:
response_data = r.json()
self.log_message(str(response_data))
except Exception as ex:
logging.info(str(ex))


def scan_text_and_report_leaks(self, data, name, regex_config, scan_arguments, ticket):
secret_found, scan_text_result = scan_text(regex_config, data)
scan_text_result["ticket_data"] = ticket
Expand Down
Loading