260 lines
8.3 KiB
Python
260 lines
8.3 KiB
Python
|
#!/usr/bin/env python3
|
||
|
#
|
||
|
# Cross Platform and Multi Architecture Advanced Binary Emulation Framework
|
||
|
#
|
||
|
|
||
|
import os
|
||
|
from os import PathLike
|
||
|
from typing import Any, Callable, MutableMapping, Union
|
||
|
|
||
|
from .path import QlOsPath
|
||
|
from .filestruct import ql_file
|
||
|
|
||
|
QlPath = Union['PathLike[str]', str, 'PathLike[bytes]', bytes]
|
||
|
|
||
|
|
||
|
# All mapped objects should inherit this class.
|
||
|
# Note this object is compatible with ql_file.
|
||
|
# Q: Why not derive from ql_file directly?
|
||
|
# A: ql_file assumes that it holds a path with a corresponding fd, but
|
||
|
# a QlFsMappedObject doesn't have to be associated with a path or fd
|
||
|
# and thus the default implementation may cause unexpected behaviors.
|
||
|
# Simply let it crash if the method is not implemented.
|
||
|
#
|
||
|
# A quick way to create a QlFsMappedObject is `ql_file.open` or `open`.
|
||
|
class QlFsMappedObject:
|
||
|
def __init__(self):
|
||
|
pass
|
||
|
|
||
|
def read(self, expected_len):
|
||
|
raise NotImplementedError("QlFsMappedObject method not implemented: read")
|
||
|
|
||
|
def write(self, buffer):
|
||
|
raise NotImplementedError("QlFsMappedObject method not implemented: write")
|
||
|
|
||
|
def fileno(self):
|
||
|
raise NotImplementedError("QlFsMappedObject method not implemented: fileno")
|
||
|
|
||
|
def lseek(self, lseek_offset, lseek_origin):
|
||
|
raise NotImplementedError("QlFsMappedObject method not implemented: lseek")
|
||
|
|
||
|
def close(self):
|
||
|
raise NotImplementedError("QlFsMappedObject method not implemented: close")
|
||
|
|
||
|
def fstat(self):
|
||
|
raise NotImplementedError("QlFsMappedObject method not implemented: fstat")
|
||
|
|
||
|
def ioctl(self, ioctl_cmd, ioctl_arg):
|
||
|
raise NotImplementedError("QlFsMappedObject method not implemented: ioctl")
|
||
|
|
||
|
def tell(self):
|
||
|
raise NotImplementedError("QlFsMappedObject method not implemented: tell")
|
||
|
|
||
|
def dup(self):
|
||
|
raise NotImplementedError("QlFsMappedObject method not implemented: dup")
|
||
|
|
||
|
def readline(self, end = b'\n'):
|
||
|
raise NotImplementedError("QlFsMappedObject method not implemented: readline")
|
||
|
|
||
|
@property
|
||
|
def name(self):
|
||
|
raise NotImplementedError("QlFsMappedObject property not implemented: name")
|
||
|
|
||
|
|
||
|
class QlFsMapper:
|
||
|
|
||
|
def __init__(self, path: QlOsPath):
|
||
|
self._mapping: MutableMapping[str, Any] = {}
|
||
|
self.path = path
|
||
|
|
||
|
def __contains__(self, vpath: str) -> bool:
|
||
|
# canonicalize the path first
|
||
|
absvpath = self.path.virtual_abspath(vpath)
|
||
|
|
||
|
return absvpath in self._mapping
|
||
|
|
||
|
def has_mapping(self, vpath: str) -> bool:
|
||
|
"""Check whether a specific virtrual path has a binding.
|
||
|
|
||
|
Args:
|
||
|
vpath: virtual path name to check
|
||
|
|
||
|
Returns: `True` if the specified virtual path has been bound, `False` otherwise.
|
||
|
"""
|
||
|
|
||
|
return vpath in self
|
||
|
|
||
|
def __len__(self) -> int:
|
||
|
return len(self._mapping)
|
||
|
|
||
|
def mapping_count(self) -> int:
|
||
|
"""Count of currently existing bindings.
|
||
|
"""
|
||
|
|
||
|
return len(self)
|
||
|
|
||
|
def __open_mapped(self, absvpath: str, opener: Callable, *args) -> Any:
|
||
|
"""Internal method user for opening an existing mapped object.
|
||
|
|
||
|
Args:
|
||
|
absvpath: absolute virtual path name
|
||
|
opener: a method to use to open the target host path
|
||
|
*args: arguments to the opener method
|
||
|
"""
|
||
|
|
||
|
mapped = self._mapping[absvpath]
|
||
|
|
||
|
# mapped to a file name on the host file system
|
||
|
if isinstance(mapped, str):
|
||
|
obj = opener(mapped, *args)
|
||
|
|
||
|
# mapped to a class or a method
|
||
|
elif callable(mapped):
|
||
|
obj = mapped()
|
||
|
|
||
|
# mapped to another kind of object
|
||
|
else:
|
||
|
obj = mapped
|
||
|
|
||
|
return obj
|
||
|
|
||
|
def __open_new(self, absvpath: str, opener: Callable, *args) -> Any:
|
||
|
hpath = self.path.virtual_to_host_path(absvpath)
|
||
|
|
||
|
if not self.path.is_safe_host_path(hpath):
|
||
|
raise PermissionError(f'unsafe path: {hpath}')
|
||
|
|
||
|
return opener(hpath, *args)
|
||
|
|
||
|
def open_ql_file(self, vpath: str, flags: int, mode: int):
|
||
|
absvpath = self.path.virtual_abspath(vpath)
|
||
|
opener = self.__open_mapped if self.has_mapping(absvpath) else self.__open_new
|
||
|
|
||
|
return opener(absvpath, ql_file.open, flags, mode)
|
||
|
|
||
|
def open(self, vpath: str, mode: str):
|
||
|
absvpath = self.path.virtual_abspath(vpath)
|
||
|
opener = self.__open_mapped if self.has_mapping(absvpath) else self.__open_new
|
||
|
|
||
|
return opener(absvpath, open, mode)
|
||
|
|
||
|
def file_exists(self, vpath: str) -> bool:
|
||
|
"""Check whether a file exists on the virtual file system.
|
||
|
|
||
|
Args:
|
||
|
vpath: virtual path name to check
|
||
|
|
||
|
Returns: `True` if the specified virtual path has an existing mapping or
|
||
|
resolves to an existing file on the virtual file system. `False` otherwise.
|
||
|
"""
|
||
|
|
||
|
if self.has_mapping(vpath):
|
||
|
return True
|
||
|
|
||
|
hpath = self.path.virtual_to_host_path(vpath)
|
||
|
|
||
|
if not self.path.is_safe_host_path(hpath):
|
||
|
raise PermissionError(f'unsafe path: {hpath}')
|
||
|
|
||
|
return os.path.isfile(hpath)
|
||
|
|
||
|
def create_empty_file(self, vpath: str) -> bool:
|
||
|
if not self.file_exists(vpath):
|
||
|
try:
|
||
|
f = self.open(vpath, "w+")
|
||
|
except OSError:
|
||
|
# for some reason, we could not create an empty file.
|
||
|
return False
|
||
|
else:
|
||
|
f.close()
|
||
|
|
||
|
return True
|
||
|
|
||
|
def __fspath(self, path: QlPath) -> str:
|
||
|
"""Similar to os.fspath, this method takes a path-like object and returns
|
||
|
its string representation.
|
||
|
"""
|
||
|
|
||
|
if isinstance(path, PathLike):
|
||
|
path = path.__fspath__()
|
||
|
|
||
|
if isinstance(path, str):
|
||
|
return path
|
||
|
|
||
|
elif isinstance(path, bytes):
|
||
|
return path.decode('utf-8')
|
||
|
|
||
|
raise TypeError(path)
|
||
|
|
||
|
def add_mapping(self, vpath: QlPath, binding: Union[QlPath, QlFsMappedObject, Callable], *, force: bool = False) -> None:
|
||
|
"""Create a new mapping in the virtual filesystem.
|
||
|
|
||
|
Args:
|
||
|
vpath: a virtual path to bind
|
||
|
|
||
|
binding: a target to use whenever the bound virtual path is referenced. such a target can be
|
||
|
either a path on the host filesystem, an object instance or a class. the behavior of the mapping
|
||
|
is determined by the bound object type:
|
||
|
[*] a string: bind a path on the host filesystem (e.g. "/dev/urandom"). use with caution!
|
||
|
[*] an object: bind an object instance which will be returned each time the virtual path is opened
|
||
|
[*] a class: bind a class that will be instantiated each time the virtual path is opened
|
||
|
|
||
|
force: when set to `True`, re-mapping an existing vpath becomes possible. In such case, the
|
||
|
old mapping will be discarded
|
||
|
|
||
|
Raises:
|
||
|
`KeyError`: in case the specified vpath has already been mapped (default behavior).
|
||
|
"""
|
||
|
|
||
|
vpath = self.__fspath(vpath)
|
||
|
absvpath = self.path.virtual_abspath(vpath)
|
||
|
|
||
|
if self.has_mapping(absvpath) and not force:
|
||
|
raise KeyError(f'mapping already exists: "{absvpath}"')
|
||
|
|
||
|
if isinstance(binding, (str, bytes, PathLike)):
|
||
|
binding = self.__fspath(binding)
|
||
|
|
||
|
self._mapping[absvpath] = binding
|
||
|
|
||
|
def remove_mapping(self, vpath: QlPath) -> None:
|
||
|
"""Remove a mapping from the fs mapper.
|
||
|
|
||
|
Args:
|
||
|
vpath: bound virtual path to remove
|
||
|
|
||
|
Raises:
|
||
|
`KeyError`: in case the specified vpath has no mapping
|
||
|
"""
|
||
|
|
||
|
vpath = self.__fspath(vpath)
|
||
|
absvpath = self.path.virtual_abspath(vpath)
|
||
|
|
||
|
if not self.has_mapping(absvpath):
|
||
|
raise KeyError(absvpath)
|
||
|
|
||
|
del self._mapping[absvpath]
|
||
|
|
||
|
def rename_mapping(self, old_vpath: str, new_vpath: str) -> None:
|
||
|
old_absvpath = self.path.virtual_abspath(old_vpath)
|
||
|
|
||
|
# vpath to rename does not exist
|
||
|
if not self.has_mapping(old_absvpath):
|
||
|
raise KeyError(old_vpath)
|
||
|
|
||
|
new_absvpath = self.path.virtual_abspath(new_vpath)
|
||
|
|
||
|
# new vpath already exists
|
||
|
if self.has_mapping(new_absvpath):
|
||
|
raise KeyError(new_vpath)
|
||
|
|
||
|
# avoid renaming to the same vapth
|
||
|
if old_absvpath == new_absvpath:
|
||
|
return
|
||
|
|
||
|
binding = self._mapping[old_absvpath]
|
||
|
|
||
|
# remove old mapping and add a new one instead
|
||
|
self._mapping[new_absvpath] = binding
|
||
|
del self._mapping[old_absvpath]
|