import base64 from urllib.parse import quote import defusedxml.ElementTree as ET import aiohttp from .base import AbstractStorageBackend class WebDAVBackend(AbstractStorageBackend): """ WebDAV backend. Compatible with Nextcloud and any standard WebDAV server. root_path should be the WebDAV root on the server, e.g. '/remote.php/dav/files/username'. """ def __init__( self, url: str, username: str, password: str, root_path: str = "/", ) -> None: self._base = url.rstrip("/") self._root = root_path.rstrip("/") creds = base64.b64encode(f"{username}:{password}".encode()).decode() self._auth = f"Basic {creds}" @property def driver_name(self) -> str: return "webdav" def _url(self, *parts: str) -> str: encoded = "/".join(quote(p, safe="") for p in parts) return f"{self._base}{self._root}/{encoded}" def _headers(self, extra: dict | None = None) -> dict[str, str]: h = {"Authorization": self._auth} if extra: h.update(extra) return h async def _ensure_collection(self, session: aiohttp.ClientSession, *parts: str) -> None: """MKCOL is idempotent — ignore 405 (already exists).""" url = self._url(*parts) async with session.request("MKCOL", url, headers=self._headers()) as resp: if resp.status not in (200, 201, 405): pass # best-effort; PUT will fail if directory is truly missing async def put(self, bucket: str, key: str, data: bytes) -> None: async with aiohttp.ClientSession() as session: await self._ensure_collection(session, bucket) parts = key.split("/") for i in range(1, len(parts)): await self._ensure_collection(session, bucket, *parts[:i]) url = self._url(bucket, key) async with session.put(url, data=data, headers=self._headers()) as resp: if resp.status not in (200, 201, 204): raise OSError(f"WebDAV PUT {url} → {resp.status}") async def get(self, bucket: str, key: str) -> bytes: async with aiohttp.ClientSession() as session: url = self._url(bucket, key) async with session.get(url, headers=self._headers()) as resp: if resp.status == 404: raise KeyError(f"{bucket}/{key}") if resp.status != 200: raise OSError(f"WebDAV GET {url} → {resp.status}") return await resp.read() async def delete(self, bucket: str, key: str) -> None: async with aiohttp.ClientSession() as session: url = self._url(bucket, key) async with session.delete(url, headers=self._headers()) as resp: if resp.status not in (200, 204, 404): raise OSError(f"WebDAV DELETE {url} → {resp.status}") async def list_keys(self, bucket: str) -> list[str]: async with aiohttp.ClientSession() as session: url = self._url(bucket) headers = self._headers({"Depth": "infinity", "Content-Type": "application/xml"}) body = '' async with session.request("PROPFIND", url, headers=headers, data=body) as resp: if resp.status == 404: return [] if resp.status != 207: return [] xml_body = await resp.text() ns = {"d": "DAV:"} try: root = ET.fromstring(xml_body) except ET.ParseError: return [] prefix = f"{self._base}{self._root}/{quote(bucket, safe='')}/" keys: list[str] = [] for response in root.findall("d:response", ns): href = response.findtext("d:href", namespaces=ns) or "" prop = response.find(".//d:prop", ns) if prop is not None: rt = prop.find("d:resourcetype", ns) if rt is not None and rt.find("d:collection", ns) is not None: continue # skip directories if href.startswith(prefix): keys.append(href[len(prefix):]) return keys async def exists(self, bucket: str, key: str) -> bool: async with aiohttp.ClientSession() as session: url = self._url(bucket, key) async with session.request("HEAD", url, headers=self._headers()) as resp: return resp.status == 200 async def test_connection(self) -> None: async with aiohttp.ClientSession() as session: root_url = f"{self._base}{self._root}/" headers = self._headers({"Depth": "0"}) async with session.request("PROPFIND", root_url, headers=headers) as resp: if resp.status not in (200, 207): raise ConnectionError(f"WebDAV root PROPFIND → {resp.status}")