from asyncio import current_task, protocols from dataclasses import dataclass, field from typing import Dict, Any, Optional, Sequence import logging import nio import synapse_admin import asyncio class MatrixAdminClient: def __init__(self, server: str, access_token: str) -> None: protocol = server.split("://")[0] + "://" server = server.split("://")[1] self.user = synapse_admin.User(server, 443, access_token, protocol) self.room = synapse_admin.Room(server, 443, access_token, protocol) self.media = synapse_admin.Media(server, 443, access_token, protocol) self.management = synapse_admin.Management( server, 443, access_token, protocol) class MatrixHelper: ROOM_POWER_LEVEL_EVENTS = { "m.room.encryption": 200, "m.room.avatar": 50, "m.room.name": 50, "m.room.canonical_alias": 50, "m.space.child": 50, "m.room.history_visibility": 50, "m.room.power_levels": 50, "m.room.topic": 50, "m.room.tombstone": 50, "m.room.server_acl": 50, "m.room.pinned_events": 50, "m.reaction": 0, "m.room.redaction": 0, "im.vector.modular.widgets": 50, "io.element.voice_broadcast_info": 50 } def __init__(self, server: str, username, space_id: str, matrix_username: str) -> None: self._client = nio.AsyncClient(server, username) self._loggedIn = False self._space_id = space_id self._server = server self._username = username self._matrix_username = matrix_username def __del__(self): asyncio.run(self.logout()) async def login(self, password): try: await self._client.login(password) self._loggedIn = True logging.info(f"Logged into Matrix as {await self._client.get_displayname()}") rooms = await self._client.joined_rooms() if self._space_id not in rooms.rooms: logging.error("The bot user is not in the space!") return False # admin login self._admin_client = MatrixAdminClient( self._server, self._client.access_token) user = self._admin_client.user.query(self._username) logging.info(f"Logged into Synapse-admin as {user['name']}") return True except Exception as e: print(e) logging.error(f"Error while logging into matrix server!") return False async def logout(self): await self._client.close() async def create_room(self, name, alias, joinable_for_space_members=False, suggested=False): # we not only need to create the room but also have to add it to the space via_domain = self._space_id.split(":")[1] initial_state = [ { "type": "m.space.parent", "state_key": self._space_id, "content": { "canonical": True, "via": [via_domain], } }, { "type": "m.room.history_visibility", "content": {"history_visibility": "shared"} }, { "type": "m.room.power_levels", "content": { "events": MatrixHelper.ROOM_POWER_LEVEL_EVENTS, "invite": 50, "users": { self._matrix_username: 200, } } } ] if joinable_for_space_members: initial_state.insert(0, { "type": "m.room.guest_access", "state_key": "", "content": {"guest_access": "can_join"} }) initial_state.insert(2, { "type": "m.room.join_rules", "content": { "join_rule": "restricted", "allow": [ { "type": "m.room_membership", "room_id": self._space_id } ] } }) room = await self._client.room_create(visibility=nio.RoomVisibility.private, name=name, alias=alias, federate=False, initial_state=initial_state) assert room.room_id is not None # add as child room state_update = await self._client.room_put_state(self._space_id, "m.space.child", { "suggested": suggested, "via": [via_domain], }, state_key=room.room_id) assert state_update.event_id is not None return room async def get_rooms(self): return self._admin_client.room.lists(limit=200, recent_first=False, orderby="joined_local_members") async def get_users(self): return self._admin_client.user.lists() async def get_rooms_of_user(self, username): return self._admin_client.user.joined_room(username) async def add_user_to_room(self, user, room): return self._admin_client.user.join_room(user, room) async def remove_user_from_room(self, user, room): # TODO: Ban users? return await self._client.room_kick(room, user, "You have been removed automatically since you are no longer a member of the group associated with this room. If you think, this is an error, please contact us.") async def get_room_power_levels(self, room): current_state = await self._client.room_get_state_event(room, "m.room.power_levels") return current_state.content['users'] async def set_user_power_level_in_room(self, user, room, power_level): if user == self._matrix_username: return False if power_level != 100 and power_level != 50 and power_level != 0: return False users = await self.get_room_power_levels(room) if not self._matrix_username in users or users[self._matrix_username] < 100: self._admin_client.room.set_admin(room, self._matrix_username) users = await self.get_room_power_levels(room) users[user] = power_level room_power_level_events = MatrixHelper.ROOM_POWER_LEVEL_EVENTS for key, value in room_power_level_events.items(): if value > users[self._matrix_username]: room_power_level_events[key] = users[self._matrix_username] result = await self._client.room_put_state(room, "m.room.power_levels", { "events": room_power_level_events, "invite": 50, "users": users }) if isinstance(result, nio.ErrorResponse): logging.error(f"Error while setting power level of user {user} in room {room} to {power_level}") logging.error(result) logging.error(f"Current power levels: {users}") return False return True