- commit
- 337592899179aa1d399bdeddb25db3d3695d882c
- parent
- b7a41a2b89d25c39c59db3315f99be7ff0dfb4c4
- Author
- Tobias Bengfort <tobias.bengfort@posteo.de>
- Date
- 2026-05-24 11:38
Merge pull request #7 from xi/sandbox-root2 anonymous namespacing (version 2)
Diffstat
| D | xikeyring/app_id.py | 21 | --------------------- |
| M | xikeyring/dbus.py | 68 | +++++++++++++++++++++++++++++++------------------------------ |
| M | xikeyring/keyring.py | 104 | ++++++++++++++++++++++++++++++++----------------------------- |
| A | xikeyring/pidfd.py | 28 | ++++++++++++++++++++++++++++ |
4 files changed, 117 insertions, 104 deletions
diff --git a/xikeyring/app_id.py b/xikeyring/app_id.py
@@ -1,21 +0,0 @@1 -1 import configparser2 -1 import selectors3 -1 from pathlib import Path4 -15 -16 -1 def get_app_id(pid: int, pidfd: int) -> str:7 -1 path = Path('/proc') / str(pid) / 'root' / '.flatpak-info'8 -1 config = configparser.ConfigParser()9 -1 try:10 -1 with path.open() as fh:11 -1 config.read_file(fh)12 -1 app_id = config['Application']['name']13 -1 except Exception:14 -1 return ''15 -116 -1 with selectors.DefaultSelector() as sel:17 -1 sel.register(pidfd, selectors.EVENT_READ)18 -1 if sel.select(0) != []:19 -1 raise ValueError('Calling process has quit')20 -121 -1 return app_id
diff --git a/xikeyring/dbus.py b/xikeyring/dbus.py
@@ -8,10 +8,10 @@ import gi 8 8 from gi.repository import Gio 9 9 from gi.repository import GLib 10 1011 -1 from .app_id import get_app_id12 11 from .dbus_sessions import create_session 13 12 from .keyring import AccessDeniedError 14 13 from .keyring import NotFoundError -1 14 from .pidfd import PID 15 15 16 16 OFSP = '/org/freedesktop/secrets' 17 17 OFSI = 'org.freedesktop.Secret' @@ -115,7 +115,7 @@ class BaseDBusService: 115 115 self._call(conn, sender, path, iface, f'Set{prop}', [value], error) 116 116 return True 117 117118 -1 def get_app_id(self, conn, sender) -> str:-1 118 def get_pid(self, conn, sender) -> PID: 119 119 (cred,), fds = conn.call_with_unix_fd_list_sync( 120 120 'org.freedesktop.DBus', 121 121 '/org/freedesktop/DBus', @@ -128,7 +128,7 @@ class BaseDBusService: 128 128 Gio.UnixFDList(), 129 129 None, 130 130 )131 -1 return get_app_id(cred['ProcessID'], fds.get(cred['ProcessFD']))-1 131 return PID(cred['ProcessID'], fds.get(cred['ProcessFD'])) 132 132 133 133 134 134 class DBusService(BaseDBusService): @@ -185,8 +185,8 @@ class DBusService(BaseDBusService): 185 185 ), 186 186 ) 187 187188 -1 def search_items(self, app_id, conn, query={}):189 -1 items = self.keyring.search_items(app_id, query)-1 188 def search_items(self, pid, conn, query={}): -1 189 items = self.keyring.search_items(pid, query) 190 190 self.update_items(conn, add=items, emit=False) 191 191 return items 192 192 @@ -231,8 +231,8 @@ class DBusService(BaseDBusService): 231 231 self.session_close(conn, sender, path) 232 232 233 233 def service_search_items(self, conn, sender, path, query):234 -1 app_id = self.get_app_id(conn, sender)235 -1 items = self.search_items(app_id, conn, query)-1 234 pid = self.get_pid(conn, sender) -1 235 items = self.search_items(pid, conn, query) 236 236 return GLib.Variant('(aoao)', (self.ids_to_paths(items), [])) 237 237 238 238 def service_unlock(self, conn, sender, path, objects): @@ -244,11 +244,11 @@ class DBusService(BaseDBusService): 244 244 245 245 def service_get_secrets(self, conn, sender, path, items, session_path): 246 246 session = self.sessions[session_path][2]247 -1 app_id = self.get_app_id(conn, sender)-1 247 pid = self.get_pid(conn, sender) 248 248 result = [] 249 249 for path in items: 250 250 id = int(path.rsplit('/', 1)[1], 10)251 -1 secret = self.keyring.get_secret(app_id, id)-1 251 secret = self.keyring.get_secret(pid, id) 252 252 secret_tuple = session.encode(session_path, secret) 253 253 result.append((path, secret_tuple)) 254 254 return GLib.Variant('(a{o(oayays)})', [result]) @@ -263,8 +263,8 @@ class DBusService(BaseDBusService): 263 263 return GLib.Variant('ao', [f'{OFSP}/collection/it']) 264 264 265 265 def collection_search_items(self, conn, sender, path, query):266 -1 app_id = self.get_app_id(conn, sender)267 -1 items = self.search_items(app_id, conn, query)-1 266 pid = self.get_pid(conn, sender) -1 267 items = self.search_items(pid, conn, query) 268 268 return GLib.Variant('(ao)', [self.ids_to_paths(items)]) 269 269 270 270 def collection_create_item( @@ -273,21 +273,21 @@ class DBusService(BaseDBusService): 273 273 session = self.sessions[secret_tuple[0]][2] 274 274 secret = session.decode(secret_tuple) 275 275 attributes = properties.get(f'{OFSI}.Item.Attributes', {})276 -1 app_id = self.get_app_id(conn, sender)-1 276 pid = self.get_pid(conn, sender) 277 277 id = None 278 278 if replace:279 -1 matches = self.search_items(app_id, conn, attributes)-1 279 matches = self.search_items(pid, conn, attributes) 280 280 if matches: 281 281 id = matches[0]282 -1 self.keyring.update_secret(app_id, id, secret)-1 282 self.keyring.update_secret(pid, id, secret) 283 283 if not id:284 -1 id = self.keyring.create_item(app_id, attributes, secret)-1 284 id = self.keyring.create_item(pid, attributes, secret) 285 285 self.update_items(conn, add=[id]) 286 286 return GLib.Variant('(oo)', (f'{OFSP}/collection/it/{id}', '/')) 287 287 288 288 def collection_get_items(self, conn, sender, path):289 -1 app_id = self.get_app_id(conn, sender)290 -1 items = self.search_items(app_id, conn)-1 289 pid = self.get_pid(conn, sender) -1 290 items = self.search_items(pid, conn) 291 291 return GLib.Variant('ao', self.ids_to_paths(items)) 292 292 293 293 def collection_get_label(self, conn, sender, path): @@ -304,15 +304,15 @@ class DBusService(BaseDBusService): 304 304 305 305 def item_delete(self, conn, sender, path): 306 306 id = int(path.rsplit('/', 1)[1], 10)307 -1 app_id = self.get_app_id(conn, sender)308 -1 self.keyring.delete_item(app_id, id)-1 307 pid = self.get_pid(conn, sender) -1 308 self.keyring.delete_item(pid, id) 309 309 self.update_items(conn, rm=[id]) 310 310 return GLib.Variant('(o)', ['/']) 311 311 312 312 def item_get_secret(self, conn, sender, path, session_path): 313 313 id = int(path.rsplit('/', 1)[1], 10)314 -1 app_id = self.get_app_id(conn, sender)315 -1 secret = self.keyring.get_secret(app_id, id)-1 314 pid = self.get_pid(conn, sender) -1 315 secret = self.keyring.get_secret(pid, id) 316 316 session = self.sessions[session_path][2] 317 317 secret_tuple = session.encode(session_path, secret) 318 318 return GLib.Variant('((oayays))', [secret_tuple]) @@ -321,8 +321,8 @@ class DBusService(BaseDBusService): 321 321 id = int(path.rsplit('/', 1)[1], 10) 322 322 session = self.sessions[secret_tuple[0]][2] 323 323 secret = session.decode(secret_tuple)324 -1 app_id = self.get_app_id(conn, sender)325 -1 self.keyring.update_secret(app_id, id, secret)-1 324 pid = self.get_pid(conn, sender) -1 325 self.keyring.update_secret(pid, id, secret) 326 326 327 327 def item_get_label(self, conn, sender, path): 328 328 return GLib.Variant('s', path.rsplit('/', 1)[1]) @@ -341,15 +341,15 @@ class DBusService(BaseDBusService): 341 341 342 342 def item_get_attributes(self, conn, sender, path): 343 343 id = int(path.rsplit('/', 1)[1], 10)344 -1 app_id = self.get_app_id(conn, sender)345 -1 attributes = self.keyring.get_attributes(app_id, id)-1 344 pid = self.get_pid(conn, sender) -1 345 attributes = self.keyring.get_attributes(pid, id) 346 346 return GLib.Variant('a{ss}', attributes.items()) 347 347 348 348 def item_set_attributes(self, conn, sender, path, value): 349 349 id = int(path.rsplit('/', 1)[1], 10) 350 350 attributes = value.unpack()351 -1 app_id = self.get_app_id(conn, sender)352 -1 self.keyring.update_attributes(app_id, id, attributes)-1 351 pid = self.get_pid(conn, sender) -1 352 self.keyring.update_attributes(pid, id, attributes) 353 353 354 354 conn.emit_signal( 355 355 None, @@ -380,15 +380,17 @@ class DBusService(BaseDBusService): 380 380 conn, handle, 'org.freedesktop.impl.portal.Request' 381 381 ) 382 382 try:383 -1 if self.get_app_id(conn, sender):384 -1 raise AccessDeniedError385 -1 attrs = {'application': 'org.freedesktop.portal.Secret'}386 -1 ids = self.keyring.search_items(app_id, attrs)-1 383 pid = self.get_pid(conn, sender) -1 384 attrs = { -1 385 'application': 'org.freedesktop.portal.Secret', -1 386 'app_id': app_id, -1 387 } -1 388 ids = self.keyring.search_items(pid, attrs) 387 389 if ids:388 -1 secret = self.keyring.get_secret(app_id, ids[0])-1 390 secret = self.keyring.get_secret(pid, ids[0]) 389 391 else: 390 392 secret = os.urandom(64)391 -1 self.keyring.create_item(app_id, attrs, secret)-1 393 self.keyring.create_item(pid, attrs, secret) 392 394 os.write(fd, secret) 393 395 os.close(fd) 394 396 finally:
diff --git a/xikeyring/keyring.py b/xikeyring/keyring.py
@@ -9,6 +9,7 @@ from cryptography.fernet import InvalidToken 9 9 10 10 from . import crypto 11 11 from .kernel_keyring import KernelKey -1 12 from .pidfd import PID 12 13 from .prompt import PinentryPrompt as Prompt 13 14 14 15 @@ -24,12 +25,13 @@ class NotFoundError(Exception): 24 25 class Item: 25 26 secret: bytes 26 27 attributes: dict[str, str]27 -1 app_id: str28 28 29 2930 -1 def write_bytes(path: Path, data: bytes) -> int:-1 30 def write_bytes(path: Path, data: bytes, pid: PID | None = None) -> int: 31 31 flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC 32 32 fd = os.open(path, flags, mode=0o600) -1 33 if pid: -1 34 pid.check_active() 33 35 try: 34 36 return os.write(fd, data) 35 37 finally: @@ -87,95 +89,97 @@ class Keyring: 87 89 write_bytes(path, encrypted) 88 90 return KernelKey(key) 89 9190 -1 def _read(self) -> dict[int, Item]:91 -1 if not self.path.exists():-1 92 def _read(self, pid: PID) -> dict[int, Item]: -1 93 path = pid.path(self.path) -1 94 if not path.exists(): 92 95 return {} 93 9694 -1 encrypted = self.path.read_bytes()-1 97 encrypted = path.read_bytes() -1 98 pid.check_active() 95 99 decrypted = Fernet(self.key.value).decrypt(encrypted) 96 100 raw = json.loads(decrypted) 97 101 return {98 -1 id: Item(base64.urlsafe_b64decode(secret), attributes, app_id)99 -1 for id, secret, attributes, app_id in raw-1 102 id: Item(base64.urlsafe_b64decode(secret), attributes) -1 103 for id, secret, attributes in raw 100 104 } 101 105102 -1 def _write(self, items: dict[int, Item]):-1 106 def _write(self, pid: PID, items: dict[int, Item]): -1 107 path = pid.path(self.path) -1 108 if not path.parent.exists(): -1 109 # Raise an error instead of creating the directory because this -1 110 # might be a tmpfs. -1 111 raise NotFoundError -1 112 103 113 raw = [ 104 114 ( 105 115 id, 106 116 base64.urlsafe_b64encode(item.secret).decode(), 107 117 item.attributes,108 -1 item.app_id,109 118 ) 110 119 for id, item in items.items() 111 120 ] 112 121 decrypted = json.dumps(raw).encode('utf-8') 113 122 encrypted = Fernet(self.key.value).encrypt(decrypted)114 -1 write_bytes(self.path, encrypted)-1 123 write_bytes(path, encrypted, pid) 115 124116 -1 def confirm_access(self, app_id: str) -> None:117 -1 if not self.prompt.confirm(f'Allow {app_id or "host"} to access a secret from your keyring?'):-1 125 def confirm_access(self) -> None: -1 126 if not self.prompt.confirm('Allow access to a secret from your keyring?'): 118 127 raise AccessDeniedError 119 128120 -1 def confirm_change(self, app_id: str) -> None:121 -1 if not self.prompt.confirm(f'Allow {app_id or "host"} to make changes to your keyring?'):-1 129 def confirm_change(self) -> None: -1 130 if not self.prompt.confirm('Allow changes to your keyring?'): 122 131 raise AccessDeniedError 123 132124 -1 def get(self, items: dict[int, Item], app_id: str, id: int) -> Item:-1 133 def get(self, items: dict[int, Item], id: int) -> Item: 125 134 try:126 -1 item = items[id]-1 135 return items[id] 127 136 except KeyError as e: 128 137 raise NotFoundError from e129 -1 if item.app_id != app_id:130 -1 raise NotFoundError131 -1 return item132 138133 -1 def search_items(self, app_id: str, query: dict[str, str] = {}) -> list[int]:134 -1 items = self._read()-1 139 def search_items(self, pid: PID, query: dict[str, str] = {}) -> list[int]: -1 140 items = self._read(pid) 135 141 return [ 136 142 id for id, item in items.items()137 -1 if item.app_id == app_id and all(138 -1 item.attributes.get(key) == value for key, value in query.items()139 -1 )-1 143 if all(item.attributes.get(k) == v for k, v in query.items()) 140 144 ] 141 145142 -1 def get_attributes(self, app_id: str, id: int) -> dict[str, str]:143 -1 items = self._read()144 -1 return self.get(items, app_id, id).attributes-1 146 def get_attributes(self, pid: PID, id: int) -> dict[str, str]: -1 147 items = self._read(pid) -1 148 return self.get(items, id).attributes 145 149146 -1 def get_secret(self, app_id: str, id: int) -> bytes:147 -1 items = self._read()148 -1 item = self.get(items, app_id, id)149 -1 self.confirm_access(app_id)-1 150 def get_secret(self, pid: PID, id: int) -> bytes: -1 151 items = self._read(pid) -1 152 item = self.get(items, id) -1 153 self.confirm_access() 150 154 return item.secret 151 155152 -1 def create_item(self, app_id: str, attributes: dict[str, str], secret: bytes) -> int:153 -1 items = self._read()-1 156 def create_item(self, pid: PID, attributes: dict[str, str], secret: bytes) -> int: -1 157 items = self._read(pid) 154 158 id = max(items.keys(), default=0) + 1155 -1 items[id] = Item(secret, attributes, app_id)156 -1 self._write(items)-1 159 items[id] = Item(secret, attributes) -1 160 self._write(pid, items) 157 161 return id 158 162159 -1 def update_attributes(self, app_id: str, id: int, attributes: dict[str, str]) -> None:160 -1 items = self._read()161 -1 item = self.get(items, app_id, id)162 -1 self.confirm_change(app_id)-1 163 def update_attributes(self, pid: PID, id: int, attributes: dict[str, str]) -> None: -1 164 items = self._read(pid) -1 165 item = self.get(items, id) -1 166 self.confirm_change() 163 167 item.attributes = attributes164 -1 self._write(items)-1 168 self._write(pid, items) 165 169166 -1 def update_secret(self, app_id: str, id: int, secret: bytes) -> None:167 -1 items = self._read()168 -1 item = self.get(items, app_id, id)169 -1 self.confirm_change(app_id)-1 170 def update_secret(self, pid: PID, id: int, secret: bytes) -> None: -1 171 items = self._read(pid) -1 172 item = self.get(items, id) -1 173 self.confirm_change() 170 174 item.secret = secret171 -1 self._write(items)-1 175 self._write(pid, items) 172 176173 -1 def delete_item(self, app_id: str, id: int) -> None:174 -1 items = self._read()175 -1 self.get(items, app_id, id) # trigger appropriate exceptions176 -1 self.confirm_change(app_id)-1 177 def delete_item(self, pid: PID, id: int) -> None: -1 178 items = self._read(pid) -1 179 self.get(items, id) # trigger appropriate exceptions -1 180 self.confirm_change() 177 181 del items[id]178 -1 self._write(items)-1 182 self._write(pid, items) 179 183 180 184 181 185 class KeyringProxy:
diff --git a/xikeyring/pidfd.py b/xikeyring/pidfd.py
@@ -0,0 +1,28 @@
-1 1 import selectors
-1 2 from pathlib import Path
-1 3
-1 4
-1 5 class PID:
-1 6 def __init__(self, pid: int, pidfd: int):
-1 7 self.pid = pid
-1 8 self.pidfd = pidfd
-1 9
-1 10 def check_active(self) -> None:
-1 11 with selectors.DefaultSelector() as sel:
-1 12 sel.register(self.pidfd, selectors.EVENT_READ)
-1 13 if sel.select(0) != []:
-1 14 raise ValueError('Calling process has quit')
-1 15
-1 16 def path(self, path: str | Path) -> Path:
-1 17 root = (Path('/proc') / str(self.pid) / 'root').resolve()
-1 18 rel_path = Path(path).absolute().relative_to('/')
-1 19 result = (root / rel_path).resolve()
-1 20
-1 21 # FIXME: symlinks are resoled relative to the host.
-1 22 #
-1 23 # A proper fix would involve openat2()
-1 24 # (see https://github.com/python/cpython/issues/141878).
-1 25 if root not in result.parents:
-1 26 raise ValueError('path escapes mount namespace')
-1 27
-1 28 return result