import collections
import json
import math
import reprlib
import struct
from contextlib import contextmanager
from contextlib import suppress
from typing import Any
from typing import Counter as CounterType
from typing import Iterable
from typing import Iterator
from typing import MutableSet
from typing import Optional
from typing import Sequence
from typing import Type
from typing import Union
from typing import overload
from paquo._logging import get_logger
from paquo._utils import cached_property
from paquo.classes import QuPathPathClass
from paquo.java import GsonTools
from paquo.java import IllegalArgumentException
from paquo.java import PathObjectHierarchy
from paquo.java import compatibility
from paquo.pathobjects import BaseGeometry
from paquo.pathobjects import PathROIObjectType
from paquo.pathobjects import QuPathPathAnnotationObject
from paquo.pathobjects import QuPathPathDetectionObject
from paquo.pathobjects import QuPathPathTileObject
from paquo.pathobjects import fix_geojson_geometry
__all__ = ["QuPathPathObjectHierarchy"]
_logger = get_logger(__name__)
class PathObjectProxy(Sequence[PathROIObjectType], MutableSet[PathROIObjectType]):
"""set interface for path objects with support for access by index and slicing
*not meant to be instantiated by the user*
Notes
-----
Access this proxy via the `QuPathPathObjectHierarchy.annotations` or
`QuPathPathObjectHierarchy.detections` properties. It acts just like
a python set, but also supports access by index and slicing.
"""
def __init__(
self,
hierarchy: 'QuPathPathObjectHierarchy',
paquo_cls: Type[PathROIObjectType],
mask: Optional[Union[slice, Sequence[int]]] = None,
) -> None:
"""internal: not meant to be instantiated by the user"""
self._hierarchy = hierarchy
self._paquo_cls = paquo_cls
if not (
mask is None
or isinstance(mask, slice)
or (all(isinstance(x, int) for x in mask) and len(mask) > 0)
):
raise TypeError(f"mask can be slice, or Sequence[int] or None. Got: {type(mask)!r}")
self._mask: Optional[Union[slice, Sequence[int]]] = mask
@property
def _readonly(self) -> bool:
# noinspection PyProtectedMember
return self._hierarchy._readonly or self._mask is not None
@property
def _java_hierarchy(self):
return self._hierarchy.java_object
@cached_property
def _list(self):
_list = self._java_hierarchy.getObjects(None, self._paquo_cls.java_class)
if self._mask:
if isinstance(self._mask, slice):
_list = _list[self._mask]
else:
_list = [_list[x] for x in self._mask]
return _list
def _list_invalidate_cache(self):
with suppress(KeyError):
del self.__dict__["_list"]
def _disabled(self, other: Iterable[Any]) -> "PathObjectProxy":
raise NotImplementedError(f"{type(self).__name__} only supports inplace operations: '|=', '-='")
__or__ = __and__ = __sub__ = __xor__ = __iand__ = __ixor__ = _disabled
del _disabled
def __ior__(self, other: Iterable[Any]) -> "PathObjectProxy": # type: ignore
if self._mask:
raise OSError("cannot modify view")
if self._readonly:
raise OSError("project in readonly mode")
path_objects = [x.java_object for x in other]
try:
self._java_hierarchy.addPathObjects(path_objects)
finally:
self._list_invalidate_cache()
return self
update = __ior__
def __isub__(self, other: Iterable[Any]) -> "PathObjectProxy": # type: ignore
if self._mask:
raise OSError("cannot modify view")
if self._readonly:
raise OSError("project in readonly mode")
path_objects = [x.java_object for x in other]
try:
self._java_hierarchy.removeObjects(path_objects, True)
finally:
self._list_invalidate_cache()
return self
if compatibility.supports_newer_addobject_and_pathclass():
def add(self, x: PathROIObjectType) -> None:
"""adds a new path object to the proxy"""
if self._mask:
raise OSError("cannot modify view")
if self._readonly:
raise OSError("project in readonly mode")
if not isinstance(x, self._paquo_cls):
raise TypeError(f"requires {self._paquo_cls.__name__} instance got {x.__class__.__name__}")
try:
if self._hierarchy.autoflush:
self._java_hierarchy.addObject(x.java_object, True)
else:
self._java_hierarchy.addObject(x.java_object, False)
finally:
self._list_invalidate_cache()
else:
def add(self, x: PathROIObjectType) -> None:
"""adds a new path object to the proxy"""
if self._mask:
raise OSError("cannot modify view")
if self._readonly:
raise OSError("project in readonly mode")
if not isinstance(x, self._paquo_cls):
raise TypeError(f"requires {self._paquo_cls.__name__} instance got {x.__class__.__name__}")
try:
if self._hierarchy.autoflush:
self._java_hierarchy.addPathObject(x.java_object)
else:
self._java_hierarchy.addPathObjectWithoutUpdate(x.java_object)
finally:
self._list_invalidate_cache()
def discard(self, x: PathROIObjectType) -> None:
"""discard a path object from the proxy"""
if self._mask:
raise OSError("cannot modify view")
if self._readonly:
raise OSError("project in readonly mode")
if not isinstance(x, self._paquo_cls):
raise TypeError(f"requires {self._paquo_cls.__name__} instance got {x.__class__.__name__}")
try:
if self._hierarchy.autoflush:
self._java_hierarchy.removeObject(x.java_object, True)
else:
self._java_hierarchy.removeObjectWithoutUpdate(x.java_object, True)
finally:
self._list_invalidate_cache()
def clear(self) -> None:
"""clear all path objects from the proxy"""
if self._mask:
raise OSError("cannot modify view")
if self._readonly:
raise OSError("project in readonly mode")
try:
self._java_hierarchy.removeObjects(self._list, True)
finally:
self._list_invalidate_cache()
def __contains__(self, x: Any) -> bool:
"""test if path object is in proxy"""
# ... inHierarchy is private
# return bool(self._java_hierarchy.inHierarchy(x.java_object))
if not isinstance(x, self._paquo_cls):
return False
while x.parent is not None:
x = x.parent
return bool(x.java_object == self._java_hierarchy.getRootObject())
def __len__(self) -> int:
return len(self._list)
def __iter__(self: "PathObjectProxy") -> Iterator[PathROIObjectType]:
for obj in self._list:
yield self._paquo_cls(obj, update_callback=self.add)
@overload
def __getitem__(self, i: int) -> PathROIObjectType: ...
@overload
def __getitem__(self, i: slice) -> "PathObjectProxy": ...
@overload
def __getitem__(self, i: Sequence[int]) -> "PathObjectProxy": ...
def __getitem__(self, i):
if isinstance(i, int):
return self._paquo_cls(self._list[i], update_callback=self.add)
elif isinstance(i, slice):
if self._mask is None:
mask = i
elif isinstance(self._mask, slice):
_r = range(self._java_hierarchy.nObjects())
_s = _r[self._mask][i]
mask = slice(_s.start, _s.stop, _s.step)
else:
mask = self._mask[i]
return PathObjectProxy(self._hierarchy, self._paquo_cls, mask)
else:
if self._mask is None:
mask = i
elif isinstance(self._mask, slice):
_r = range(self._java_hierarchy.nObjects())
_s = _r[self._mask]
mask = [_s[idx] for idx in i]
else:
mask = [self._mask[idx] for idx in i]
return PathObjectProxy(self._hierarchy, self._paquo_cls, mask)
def count(self, value: PathROIObjectType) -> int:
return int(value in self) # PathObjectProxy is a set
def __repr__(self):
c = type(self).__name__
h = repr(self._hierarchy)
p = self._paquo_cls.__name__
m = reprlib.repr(self._mask)
i = f"0x{hex(id(self))}"
if m is None:
return f"<{c} hierarchy={h} paquo_cls={p} at {i}>"
return f"<{c} hierarchy={h} paquo_cls={p} mask={m} at {i}>"
[docs]class QuPathPathObjectHierarchy:
java_object: PathObjectHierarchy
def __init__(
self,
hierarchy: Optional[PathObjectHierarchy] = None,
*,
readonly: bool = False,
image_name: str = "N/A",
autoflush: bool = True,
) -> None:
"""qupath hierarchy stores all annotation objects
Parameters
----------
hierarchy:
a PathObjectHierarchy instance (optional)
Usually accessed directly via the Image Container.
"""
if hierarchy is None:
hierarchy = PathObjectHierarchy()
self.java_object = hierarchy
# internals
self._image_name = str(image_name)
self._readonly = bool(readonly)
self._annotations = PathObjectProxy(self, paquo_cls=QuPathPathAnnotationObject)
self._detections = PathObjectProxy(self, paquo_cls=QuPathPathDetectionObject)
# attrs
self.autoflush = bool(autoflush)
def __len__(self) -> int:
"""Number of objects in hierarchy (all types)"""
return int(self.java_object.nObjects())
@property
def is_empty(self) -> bool:
"""a hierarchy is empty if it only contains the root object"""
return bool(self.java_object.isEmpty())
[docs] @contextmanager
def no_autoflush(self):
"""prevent updates to the hierarchy to trigger an internal update event"""
_autoflush, self.autoflush = self.autoflush, False
try:
yield self
finally:
self.flush(invalidate_proxy_cache=True)
self.autoflush = _autoflush
# noinspection PyProtectedMember
[docs] def flush(self, invalidate_proxy_cache: bool = False):
"""flush changes to the hierarchy"""
root = self.java_object.getRootObject()
self.java_object.fireHierarchyChangedEvent(root)
if invalidate_proxy_cache:
self._annotations._list_invalidate_cache()
self._detections._list_invalidate_cache()
@property
def root(self) -> QuPathPathAnnotationObject:
"""the hierarchy root node
This object has no roi and cannot be assigned another class.
All other objects are descendants of this object if they are
attached to this hierarchy.
"""
root = self.java_object.getRootObject()
return QuPathPathAnnotationObject(root) # todo: specialize...
@property
def annotations(self) -> PathObjectProxy[QuPathPathAnnotationObject]:
"""all annotations provided as a flattened set-like proxy"""
return self._annotations
[docs] def add_annotation(self,
roi: BaseGeometry,
path_class: Optional[QuPathPathClass] = None,
measurements: Optional[dict] = None,
*,
path_class_probability: float = math.nan) -> QuPathPathAnnotationObject:
"""convenience method for adding annotations"""
if self._readonly:
raise OSError("project in readonly mode")
obj = QuPathPathAnnotationObject.from_shapely(
roi, path_class, measurements,
path_class_probability=path_class_probability
)
self._annotations.add(obj)
return obj
@property
def detections(self) -> PathObjectProxy[QuPathPathDetectionObject]:
"""all detections provided as a flattened set-like proxy"""
return self._detections
def add_detection(self,
roi: BaseGeometry,
path_class: Optional[QuPathPathClass] = None,
measurements: Optional[dict] = None,
*,
path_class_probability: float = math.nan) -> QuPathPathDetectionObject:
if self._readonly:
raise OSError("project in readonly mode")
"""convenience method for adding detections
Notes
-----
these will be added to self.detections
"""
obj = QuPathPathDetectionObject.from_shapely(
roi, path_class, measurements,
path_class_probability=path_class_probability
)
self._detections.add(obj)
return obj
[docs] def add_tile(self,
roi: BaseGeometry,
path_class: Optional[QuPathPathClass] = None,
measurements: Optional[dict] = None,
*,
path_class_probability: float = math.nan) -> QuPathPathTileObject:
"""convenience method for adding tile detections
Notes
-----
these will be added to self.detections
"""
if self._readonly:
raise OSError("project in readonly mode")
obj = QuPathPathTileObject.from_shapely(
roi, path_class, measurements,
path_class_probability=path_class_probability
)
self._detections.add(obj)
return obj
[docs] def to_geojson(self) -> list:
"""return all annotations as a list of geojson features"""
gson = GsonTools.getInstance()
geojson = gson.toJson(self.java_object.getAnnotationObjects())
return list(json.loads(str(geojson)))
[docs] def load_geojson(
self, geojson: list,
*, raise_on_skip: bool = False, fix_invalid: bool = False,
) -> bool:
"""load annotations into this hierarchy from a geojson list
returns True if new objects were added, False otherwise.
"""
# todo: use geojson module for type checking?
if self._readonly:
raise OSError("project in readonly mode")
if not isinstance(geojson, list):
raise TypeError("requires a geojson list")
aos = []
skipped: "CounterType[str]" = collections.Counter()
for annotation in geojson:
try:
if fix_invalid:
annotation["geometry"] = fix_geojson_geometry(annotation["geometry"])
# compatibility layer
# todo: should maybe test at the beginning of this method
# if the version supports id or not, instead of checking
# the version number...
if (
compatibility.requires_annotation_json_fix()
and 'id' not in annotation
):
object_type = annotation['properties'].get("object_type", "unknown")
object_id = {
'annotation': "PathAnnotationObject",
'detection': "PathDetectionObject",
'tile': "PathTileObject",
'cell': "PathCellObject",
'tma_core': "TMACoreObject",
'root': "PathRootObject",
'unknown': "PathAnnotationObject",
}.get(object_type, None)
if object_id is None:
_logger.warn(f"annotation has incompatible object_type: '{object_type}'")
object_id = "PathAnnotationObject"
annotation['id'] = object_id
ao = QuPathPathAnnotationObject.from_geojson(annotation)
except (IllegalArgumentException, ValueError) as err:
_logger.warn(f"Annotation skipped: {err}")
class_ = annotation["properties"].get("classification", {}).get("name", "UNDEFINED")
skipped[class_] += 1
continue
else:
aos.append(ao.java_object)
if skipped:
n_skipped = sum(skipped.values())
if raise_on_skip:
raise ValueError(f"could not convert {n_skipped} annotations")
_logger.error(
f"skipped {n_skipped} annotation objects: {skipped.most_common()}"
)
updated = bool(self.java_object.insertPathObjects(aos))
if updated:
self.flush(invalidate_proxy_cache=True)
return updated
[docs] def to_ome_xml(self, prefix="paquo", fill_alpha=0.0) -> str:
"""return all annotations in ome xml format"""
# this
try:
from ome_types import to_xml
from ome_types.model import OME
from ome_types.model import ROI
from ome_types.model import AnnotationRef
from ome_types.model import Ellipse as OmeEllipse
from ome_types.model import Line as OmeLine
from ome_types.model import Map
from ome_types.model import MapAnnotation
from ome_types.model import Point as OmePoint
from ome_types.model import Polygon as OmePolygon
from ome_types.model import Polyline as OmePolyline
from ome_types.model import Rectangle as OmeRectangle
from ome_types.model.map import M
from ome_types.model.shape import FillRule
except ImportError:
raise RuntimeError(f"{type(self).__name__}.to_ome_xml requires 'ome-types' python module and python>=3.7")
from paquo.java import EllipseROI
from paquo.java import GeometryROI
from paquo.java import LineROI
from paquo.java import PointsROI
from paquo.java import PolygonROI
from paquo.java import PolylineROI
from paquo.java import RectangleROI
ome = OME()
for ao in self.annotations:
class_name: Optional[str]
if ao.path_class:
class_name = ao.path_class.name
else:
class_name = None
# --- create the map_annotation
_m = {
f"{prefix}:object_type": {
"PathAnnotationObject": 'annotation',
"PathDetectionObject": 'detection',
"PathTileObject": 'tile',
"PathCellObject": 'cell',
"TMACoreObject": 'tma_core',
"PathRootObject": 'root',
}.get(type(ao.java_object).__name__.rpartition(".")[2], "unknown")
}
if class_name:
_m[f"{prefix}:path_class"] = class_name
if ao.name:
_m[f"{prefix}:name"] = ao.name
for k, v in ao.measurements.items():
_m[f"{prefix}:measurement:{k}"] = v
map_annotation = MapAnnotation(value=Map(m=[
M(k=_key, value=str(_value))
for _key, _value in _m.items()
]))
# --- prepare common kwargs for ome Shape
if ao.path_class and ao.path_class.color:
# https://www.openmicroscopy.org/Schemas/Documentation/Generated/OME-2016-06/ome_xsd.html#Color
r, g, b, a = ao.path_class.color.to_rgba()
stroke_color, = struct.unpack(">i", bytes([r, g, b, a]))
if fill_alpha <= 0:
fill_color = None
else:
fill_alpha = min(max(0.0, fill_alpha), 1.0)
fill_color, = struct.unpack(">i", bytes([r, g, b, int(fill_alpha*a)]))
else:
stroke_color = None
fill_color = None
# https://www.openmicroscopy.org/Schemas/Documentation/Generated/OME-2016-06/ome_xsd.html#Shape_FillRule
fill_rule = FillRule.NON_ZERO
qp_roi = ao.java_object.getROI()
the_c = int(qp_roi.getC())
the_t = int(qp_roi.getT())
the_z = int(qp_roi.getZ())
shape_kwargs = dict(
# id=...,
# annotation_ref=...,
fill_color=fill_color,
fill_rule=fill_rule,
# font_family=None,
# font_size=None,
# font_size_unit=UnitsLength("pt"),
# font_style=None,
locked=ao.locked,
stroke_color=stroke_color,
stroke_dash_array=None,
stroke_width=2, # 2px is the QuPath default gui setting
# stroke_width_unit=UnitsLength("pixel"),
text=class_name, # again class instead of ao.description for a nicer UX in omero.iviewer...
the_c=the_c if the_c >= 0 else None,
the_t=the_t,
the_z=the_z,
transform=None,
)
# --- create the roi
roi = ROI(name=class_name)
roi.annotation_ref.append(AnnotationRef(id=map_annotation.id))
# --- add the correct shape dependent on roi types
ome_shape: Any
if isinstance(qp_roi, (PolygonROI, GeometryROI)):
ome_shape = OmePolygon(
points=" ".join(f"{p.getX():f},{p.getY():f}" for p in qp_roi.getAllPoints()),
**shape_kwargs,
)
elif isinstance(qp_roi, EllipseROI):
# https://github.com/qupath/qupath/blob/e84467e86751e5aa542ab68a4915b70ecbf2f6fc/qupath-core/src/main/java/qupath/lib/roi/EllipseROI.java#L60-L63
ome_shape = OmeEllipse(
radius_x=float(qp_roi.getBoundsWidth() * 0.5),
radius_y=float(qp_roi.getBoundsHeight() * 0.5),
x=float(qp_roi.getCentroidX()),
y=float(qp_roi.getCentroidY()),
**shape_kwargs,
)
elif isinstance(qp_roi, RectangleROI):
ome_shape = OmeRectangle(
height=float(qp_roi.getBoundsHeight()),
width=float(qp_roi.getBoundsWidth()),
x=float(qp_roi.getBoundsX()),
y=float(qp_roi.getBoundsY()),
**shape_kwargs,
)
elif isinstance(qp_roi, LineROI):
ome_shape = OmeLine(
x1=float(qp_roi.getX1()),
x2=float(qp_roi.getX2()),
y1=float(qp_roi.getY1()),
y2=float(qp_roi.getY2()),
marker_end=None,
marker_start=None,
**shape_kwargs,
)
elif isinstance(qp_roi, PolylineROI):
ome_shape = OmePolyline(
points=" ".join(f"{p.getX():f},{p.getY():f}" for p in qp_roi.getAllPoints()),
marker_end=None,
marker_start=None,
**shape_kwargs,
)
elif isinstance(qp_roi, PointsROI):
# we have to create individual points in ome
ome_shape = [
OmePoint(x=float(p.getX()), y=float(p.getY()), **shape_kwargs)
for p in qp_roi.getAllPoints()
]
else:
raise NotImplementedError(f"todo {type(qp_roi).__name__}")
if isinstance(ome_shape, list):
roi.union.extend(ome_shape)
else:
roi.union.append(ome_shape)
# --- add the annotation to the ome structure
ome.rois.append(roi)
ome.structured_annotations.append(map_annotation)
return to_xml(ome)
def __repr__(self):
return f"Hierarchy(image={self._image_name}, annotations={len(self._annotations)}, detections={len(self._detections)})"
def _repr_html_(self):
from paquo._repr import br
from paquo._repr import div
from paquo._repr import h4
from paquo._repr import p
from paquo._repr import span
return div(
h4(text=f"Hierarchy: {self._image_name}", style={"margin-top": "0"}),
p(
span(text="annotations: ", style={"font-weight": "bold"}),
span(text=f"{len(self._annotations)}"),
br(),
span(text="detections: ", style={"font-weight": "bold"}),
span(text=f"{len(self._detections)}"),
style={"margin": "0.5em"},
),
)