From d44b07301470bce3acafc883f6b91e3b92fc035a Mon Sep 17 00:00:00 2001 From: kevross33 Date: Fri, 27 Feb 2026 23:10:26 +0000 Subject: [PATCH] Fix ransomware message and add new mass write signature Fix sig so it works correctly when using NTWriteFile instead of appearing in dropped files (we return True immediately on one hit to avoid marking them all). Then add a new signatures for mass copying/writing the same likely ransomware message across multiple directories. It will mark the first 5 calls but display the count to avoid it returning potentially hundreds or even thousands of marked calls. BASTA --- .../signatures/windows/ransomware_message.py | 309 +++++++++++------- 1 file changed, 195 insertions(+), 114 deletions(-) diff --git a/modules/signatures/windows/ransomware_message.py b/modules/signatures/windows/ransomware_message.py index 1a832c04..078e6297 100644 --- a/modules/signatures/windows/ransomware_message.py +++ b/modules/signatures/windows/ransomware_message.py @@ -20,7 +20,6 @@ except ImportError: import re - class RansomwareMessage(Signature): name = "ransomware_message" description = "Writes a potential ransom message to disk" @@ -32,38 +31,81 @@ class RansomwareMessage(Signature): ttps = ["T1486"] mbcs = ["OB0008", "E1486", "OC0001", "C0016"] - filter_apinames = {"NtWriteFile"} + filter_apinames = {"NtWriteFile", "WriteFile"} def __init__(self, *args, **kwargs): Signature.__init__(self, *args, **kwargs) self.ret = False self.indicators = [ - "your files", - "your data", - "your documents", - "restore files", - "restore data", - "restore the files", - "restore the data", - "recover files", - "recover data", - "recover the files", - "recover the data", + ".onion", + "aes 128", + "aes 256", + "aes-128", + "aes-256", + "aes128", + "aes256", + "all data", + "attention!", + "bit coin", + "bitcoin", + "bootkit", + "btc", + "decrypt", + "decrypter", + "decryptor", + "device id", + "download tor", + "encrypt", + "encrypted", + "encryption id", + "enter code", + "ethereum", + "get back my", + "get back your", + "hardwareid", "has been locked", - "pay fine", + "install tor", + "localbitcoins", + "military grade encryption", "pay a fine", + "pay fine", "pay the fine", - "decrypt", - "encrypt", + "payment", + "personal code", + "personal id", + "personal identification code", + "personal identifier", + "personal key", + "private code", + "private key", + "ransom", + "recover data", + "recover files", + "recover my", + "recover personal", + "recover the data", + "recover the files", "recover them", "recover your", - "recover personal", - "bitcoin", - "secret server", + "restore data", + "restore files", + "restore system", + "restore the data", + "restore the files", + "restore the system", + "rootkit", + "rsa 1024", + "rsa 2048", + "rsa 4096", + "rsa-1024", + "rsa-2048", + "rsa-4096", + "rsa1024", + "rsa2048", + "rsa4096", "secret internet server", - "install tor", - "download tor", + "secret server", "tor browser", "tor gateway", "tor-browser", @@ -72,128 +114,167 @@ def __init__(self, *args, **kwargs): "torgateway", "torproject.org", "tox.chat", - "ransom", - "bootkit", - "rootkit", - "payment", + "unique id", + "unique key", "victim", - "AES128", - "AES256", - "AES 128", - "AES 256", - "AES-128", - "AES-256", - "RSA1024", - "RSA2048", - "RSA4096", - "RSA 1024", - "RSA 2048", - "RSA 4096", - "RSA-1024", - "RSA-2048", - "RSA-4096", - "private key", - "personal key", + "wallet address", + "what happened", "your code", - "private code", - "personal code", - "enter code", - "your key", - "unique key", + "your data", "your database", - "encrypted", - "bit coin", - "BTC", - "ethereum", - "what happened", - "what happened", - "decryptor", - "decrypter", - "personal ID", - "unique ID", - "encryption ID", - "device ID", - "hardwareid", - "recover my", - "wallet address", - "localbitcoins", - "Attention!", - "restore the system", - "restore system", - "military grade encryption", - "personal identifier", - "personal identification code", - "get back my", - "get back your", - "your network", + "your documents", + "your files", + "your key", + "your network" ] - indicators_bytes = [i.encode("utf-8").lower() for i in self.indicators] - pattern_bytes = b"|".join(re.escape(i) for i in indicators_bytes) - self.regex = re.compile(pattern_bytes) + indicators_str = [re.escape(i.lower()) for i in self.indicators] + pattern_str = "|".join(indicators_str) + self.regex = re.compile(pattern_str) def on_call(self, call, process): - filepath = self.get_argument(call, "HandleName") - + filepath = self.get_argument(call, "HandleName") or self.get_argument(call, "FileName") + if not filepath: return - + filepath_lower = filepath.lower() - + is_target_path = ( - filepath_lower == "\\??\\physicaldrive0" - or filepath_lower.startswith("\\device\\harddisk") - or filepath_lower.endswith((".txt", ".html", ".hta", ".rtf")) - or "readme" in filepath_lower - or "read_me" in filepath_lower - or "decrypt" in filepath_lower + filepath_lower == "\\??\\physicaldrive0" or + filepath_lower.startswith("\\device\\harddisk") or + filepath_lower.endswith((".txt", ".html", ".hta", ".rtf")) or + "readme" in filepath_lower or + "read_me" in filepath_lower or + "decrypt" in filepath_lower ) - + if not is_target_path: return - buff = self.get_raw_argument(call, "Buffer") - if buff and len(buff) >= 128: - buff_lower = buff.lower() - matches = set(self.regex.findall(buff_lower)) + buff = self.get_argument(call, "Buffer") + + if buff: + if isinstance(buff, bytes) or isinstance(buff, bytearray): + buff_str = bytes(buff).decode('utf-8', errors='ignore') + else: + buff_str = str(buff) - if len(matches) > 1: - self.data.append({"ransom_note": filepath}) - self.data.append({"beginning_of_ransom_message": buff}) - - if self.pid: + if len(buff_str) >= 32: + buff_lower = buff_str.lower() + matches = set(self.regex.findall(buff_lower)) + + if len(matches) > 1: self.mark_call() - self.ret = True + return True def on_complete(self): if not self.ret and "dropped" in self.results: for dropped in self.results["dropped"]: - + raw_name = dropped.get("name", "") if isinstance(raw_name, list) and len(raw_name) > 0: filename = str(raw_name[0]).lower() else: filename = str(raw_name).lower() - - if ( - filename.endswith((".txt", ".html", ".hta", ".rtf")) - or "read_me" in filename - or "readme" in filename - or "read-me" in filename - ): + + if filename.endswith((".txt", ".html", ".hta", ".rtf")) or "read_me" in filename or "readme" in filename: filedata = dropped.get("data") + + if filedata: + if isinstance(filedata, bytes) or isinstance(filedata, bytearray): + filedata_str = bytes(filedata).decode('utf-8', errors='ignore') + else: + filedata_str = str(filedata) + + if len(filedata_str) >= 32: + filedata_lower = filedata_str.lower() + matches = set(self.regex.findall(filedata_lower)) + + if len(matches) > 1: + self.data.append({"ransom_note": filename}) + self.data.append({"beginning_of_ransom_message": filedata_str}) + self.ret = True + break - if isinstance(filedata, str): - filedata = filedata.encode("utf-8", errors="ignore") + return self.ret - if filedata and len(filedata) >= 128: - filedata_lower = filedata.lower() - matches = set(self.regex.findall(filedata_lower)) - if len(matches) > 1: - self.data.append({"ransom_note": filename}) - self.data.append({"beginning_of_ransom_message": filedata}) - self.ret = True - break +class MassRansomNoteDrop(Signature): + name = "mass_ransom_note_drop" + description = "Writes or copies the same ransom note filename across multiple directories" + severity = 3 + categories = ["ransomware"] + authors = ["Kevin Ross"] + minimum = "1.3" + evented = True + ttps = ["T1486"] + mbcs = ["OB0008", "E1486"] + + filter_apinames = set([ + "NtWriteFile", "WriteFile", + "CopyFileA", "CopyFileW", "CopyFileExA", "CopyFileExW", + "MoveFileA", "MoveFileW", "MoveFileExA", "MoveFileExW" + ]) + def __init__(self, *args, **kwargs): + Signature.__init__(self, *args, **kwargs) + self.ret = False + self.marked_calls = 0 + self.dropped_notes = {} + self.note_keywords = ("readme", "read_me", "decrypt", "restore", "instructions", "recover") + self.extensions = (".txt", ".html", ".hta", ".rtf", ".url") + + def on_call(self, call, process): + pid = process.get("process_id") + + filepath = self.get_argument(call, "NewFileName") or self.get_argument(call, "HandleName") or self.get_argument(call, "FileName") + + if not isinstance(filepath, str): + return + + filepath = filepath.replace("/", "\\") + if "\\" not in filepath: + return + + dirname, _, filename = filepath.rpartition("\\") + filename_lower = filename.lower() + + if not filename_lower.endswith(self.extensions): + return + + if not any(kw in filename_lower for kw in self.note_keywords): + return + + dirname_lower = dirname.lower() + + if pid not in self.dropped_notes: + self.dropped_notes[pid] = {} + + if filename_lower not in self.dropped_notes[pid]: + self.dropped_notes[pid][filename_lower] = set() + + if dirname_lower in self.dropped_notes[pid][filename_lower]: + return + + self.dropped_notes[pid][filename_lower].add(dirname_lower) + dir_count = len(self.dropped_notes[pid][filename_lower]) + + if dir_count >= 2 and self.marked_calls < 5: + self.mark_call() + self.marked_calls += 1 + + if dir_count >= 5: + self.ret = True + + def on_complete(self): + if self.ret: + for pid, notes in self.dropped_notes.items(): + for note_name, dirs in notes.items(): + if len(dirs) >= 5: + self.data.append({ + "ransom_note": note_name, + "pid": pid, + "directories_count": len(dirs) + }) return self.ret