189 lines
6.8 KiB
Python
189 lines
6.8 KiB
Python
from asyncio import current_task, protocols
|
|
from dataclasses import dataclass, field
|
|
from typing import Dict, Any, Optional, Sequence
|
|
import logging
|
|
import nio
|
|
import synapse_admin
|
|
import asyncio
|
|
|
|
|
|
class MatrixAdminClient:
|
|
def __init__(self, server: str, access_token: str) -> None:
|
|
protocol = server.split("://")[0] + "://"
|
|
server = server.split("://")[1]
|
|
|
|
self.user = synapse_admin.User(server, 443, access_token, protocol)
|
|
self.room = synapse_admin.Room(server, 443, access_token, protocol)
|
|
self.media = synapse_admin.Media(server, 443, access_token, protocol)
|
|
self.management = synapse_admin.Management(
|
|
server, 443, access_token, protocol)
|
|
|
|
|
|
class MatrixHelper:
|
|
|
|
ROOM_POWER_LEVEL_EVENTS = {
|
|
"m.room.encryption": 200,
|
|
"m.room.avatar": 50,
|
|
"m.room.name": 50,
|
|
"m.room.canonical_alias": 50,
|
|
"m.space.child": 50,
|
|
"m.room.history_visibility": 50,
|
|
"m.room.power_levels": 50,
|
|
"m.room.topic": 50,
|
|
"m.room.tombstone": 50,
|
|
"m.room.server_acl": 50,
|
|
"m.room.pinned_events": 50,
|
|
"m.reaction": 0,
|
|
"m.room.redaction": 0,
|
|
"im.vector.modular.widgets": 50,
|
|
"io.element.voice_broadcast_info": 50
|
|
}
|
|
|
|
def __init__(self, server: str, username, space_id: str, matrix_username: str) -> None:
|
|
self._client = nio.AsyncClient(server, username)
|
|
self._loggedIn = False
|
|
self._space_id = space_id
|
|
self._server = server
|
|
self._username = username
|
|
self._matrix_username = matrix_username
|
|
|
|
def __del__(self):
|
|
asyncio.run(self.logout())
|
|
|
|
async def login(self, password):
|
|
try:
|
|
await self._client.login(password)
|
|
self._loggedIn = True
|
|
logging.info(f"Logged into Matrix as {await self._client.get_displayname()}")
|
|
rooms = await self._client.joined_rooms()
|
|
if self._space_id not in rooms.rooms:
|
|
logging.error("The bot user is not in the space!")
|
|
return False
|
|
|
|
# admin login
|
|
self._admin_client = MatrixAdminClient(
|
|
self._server, self._client.access_token)
|
|
user = self._admin_client.user.query(self._username)
|
|
|
|
logging.info(f"Logged into Synapse-admin as {user['name']}")
|
|
|
|
return True
|
|
except Exception as e:
|
|
print(e)
|
|
logging.error(f"Error while logging into matrix server!")
|
|
return False
|
|
|
|
async def logout(self):
|
|
await self._client.close()
|
|
|
|
async def create_room(self, name, alias, joinable_for_space_members=False, suggested=False):
|
|
# we not only need to create the room but also have to add it to the space
|
|
via_domain = self._space_id.split(":")[1]
|
|
initial_state = [
|
|
{
|
|
"type": "m.space.parent",
|
|
"state_key": self._space_id,
|
|
"content": {
|
|
"canonical": True,
|
|
"via": [via_domain],
|
|
}
|
|
},
|
|
{
|
|
"type": "m.room.history_visibility",
|
|
"content": {"history_visibility": "shared"}
|
|
},
|
|
{
|
|
"type": "m.room.power_levels",
|
|
"content": {
|
|
"events": MatrixHelper.ROOM_POWER_LEVEL_EVENTS,
|
|
"invite": 50,
|
|
"users": {
|
|
self._matrix_username: 200,
|
|
}
|
|
}
|
|
}
|
|
]
|
|
|
|
if joinable_for_space_members:
|
|
initial_state.insert(0, {
|
|
"type": "m.room.guest_access",
|
|
"state_key": "",
|
|
"content": {"guest_access": "can_join"}
|
|
})
|
|
initial_state.insert(2, {
|
|
"type": "m.room.join_rules",
|
|
"content": {
|
|
"join_rule": "restricted",
|
|
"allow": [
|
|
{
|
|
"type": "m.room_membership",
|
|
"room_id": self._space_id
|
|
}
|
|
]
|
|
}
|
|
})
|
|
|
|
room = await self._client.room_create(visibility=nio.RoomVisibility.private, name=name, alias=alias, federate=False, initial_state=initial_state)
|
|
assert room.room_id is not None
|
|
|
|
# add as child room
|
|
state_update = await self._client.room_put_state(self._space_id, "m.space.child", {
|
|
"suggested": suggested,
|
|
"via": [via_domain],
|
|
}, state_key=room.room_id)
|
|
assert state_update.event_id is not None
|
|
|
|
return room
|
|
|
|
async def get_rooms(self):
|
|
return self._admin_client.room.lists(limit=200, recent_first=False, orderby="joined_local_members")
|
|
|
|
async def get_users(self):
|
|
return self._admin_client.user.lists()
|
|
|
|
async def get_rooms_of_user(self, username):
|
|
return self._admin_client.user.joined_room(username)
|
|
|
|
async def add_user_to_room(self, user, room):
|
|
return self._admin_client.user.join_room(user, room)
|
|
|
|
async def remove_user_from_room(self, user, room):
|
|
# TODO: Ban users?
|
|
return await self._client.room_kick(room, user, "You have been removed automatically since you are no longer a member of the group associated with this room. If you think, this is an error, please contact us.")
|
|
|
|
async def get_room_power_levels(self, room):
|
|
current_state = await self._client.room_get_state_event(room, "m.room.power_levels")
|
|
return current_state.content['users']
|
|
|
|
async def set_user_power_level_in_room(self, user, room, power_level):
|
|
if user == self._matrix_username:
|
|
return False
|
|
if power_level != 100 and power_level != 50 and power_level != 0:
|
|
return False
|
|
|
|
users = await self.get_room_power_levels(room)
|
|
if not self._matrix_username in users or users[self._matrix_username] < 100:
|
|
self._admin_client.room.set_admin(room, self._matrix_username)
|
|
users = await self.get_room_power_levels(room)
|
|
|
|
users[user] = power_level
|
|
|
|
room_power_level_events = MatrixHelper.ROOM_POWER_LEVEL_EVENTS
|
|
for key, value in room_power_level_events.items():
|
|
if value > users[self._matrix_username]:
|
|
room_power_level_events[key] = users[self._matrix_username]
|
|
|
|
result = await self._client.room_put_state(room, "m.room.power_levels", {
|
|
"events": room_power_level_events,
|
|
"invite": 50,
|
|
"users": users
|
|
})
|
|
|
|
if isinstance(result, nio.ErrorResponse):
|
|
logging.error(f"Error while setting power level of user {user} in room {room} to {power_level}")
|
|
logging.error(result)
|
|
logging.error(f"Current power levels: {users}")
|
|
return False
|
|
|
|
return True
|