matrix-bot/src/matrixHelper.py

91 lines
2.8 KiB
Python

from dataclasses import dataclass, field
from typing import Dict, Any, Optional, Sequence
import logging
import nio
class Schemas:
room_state = {
"type": "object",
"properties": {"event_id": {"type": "string"}},
"required": ["event_id"],
}
@dataclass
class RoomStateResponse(nio.responses.Response):
event_id: str = field()
@staticmethod
def create_error(parsed_dict):
return nio.responses.ErrorResponse.from_dict(parsed_dict)
@classmethod
def from_dict(cls, parsed_dict: Dict[Any, Any]):
try:
nio.validate_json(parsed_dict, Schemas.room_state)
except (nio.SchemaError, nio.ValidationError):
return cls.create_error(parsed_dict)
return cls(parsed_dict["event_id"])
class MatrixHelper:
def __init__(self, server, username, space_id: str) -> None:
self._client = nio.AsyncClient(server, username)
self._loggedIn = False
self._space_id = space_id
async def login(self, password):
try:
await self._client.login(password)
self._loggedIn = True
logging.info(f"Logged into Matrix as {await self._client.get_displayname()}")
rooms = await self._client.joined_rooms()
if self._space_id not in rooms.rooms:
logging.error("The bot user is not in the space!")
return False
return True
except Exception as e:
logging.error(f"Error while logging into matrix server!")
return False
async def logout(self):
await self._client.close()
async def createRoom(self, name, alias):
# we not only need to create the room but also have to add it to the space
initial_state = [
{
"type": "m.space.parent",
"state_key": self._space_id,
"content": {
"canonical": True,
"via": [self._space_id.split(":")[1]],
}
},
{
"type": "m.room.join_rules",
"content": {
"join_rule": "restricted",
"allow": [
{
"type": "m.room_membership",
"room_id": self._space_id
}
]
}
},
{
"type": "m.room.history_visibility",
"content": {"history_visibility": "invited"}
}
]
room = await self._client.room_create(visibility=nio.RoomVisibility.private, name=name, alias=alias, federate=False, initial_state=initial_state)
print(room)
logging.info(
f"Created room {room.room_id} with alias {alias} and added it to space {self._space_id}")
return room