From 4c73b26b9783d85ca804ff0252d9f8fca6bc8cd5 Mon Sep 17 00:00:00 2001 From: Leo Kirchner Date: Tue, 22 Oct 2024 07:14:42 +0200 Subject: [PATCH] wip --- diffsync/nextgen.py | 143 ++++++++++++++++++++++++++++++++++++ tests/unit/test_diffsync.py | 2 +- tests/unit/test_nextgen.py | 49 ++++++++++++ 3 files changed, 193 insertions(+), 1 deletion(-) create mode 100644 diffsync/nextgen.py create mode 100644 tests/unit/test_nextgen.py diff --git a/diffsync/nextgen.py b/diffsync/nextgen.py new file mode 100644 index 0000000..7ee0254 --- /dev/null +++ b/diffsync/nextgen.py @@ -0,0 +1,143 @@ +import enum +from collections import defaultdict +from typing import Dict, NewType, ClassVar, TypedDict, Any, Optional, Set, Tuple, Union +from typing_extensions import Self + +from pydantic import BaseModel, ConfigDict + +ModelName = NewType("ModelName", str) + + +class NGObjectAlreadyExists(Exception): + pass + + +class NGModelFieldKind(enum.Enum): + """Specify type of field.""" + ATTRIBUTE = "attribute" + IDENTIFIER = "identifier" + + +class NGModelMetadata(TypedDict): + """Used to store metadata about a model. + + """ + model_name: ModelName + + +class NGModel(BaseModel): + # Class vars are automatically excluded attributes: https://docs.pydantic.dev/latest/concepts/models/#class-vars + model_config = ConfigDict(frozen=True) + metadata: ClassVar[NGModelMetadata] + identifiers: ClassVar[Set[str]] = set() + attributes: ClassVar[Set[str]] = set() + + @classmethod + def __pydantic_init_subclass__(cls, **kwargs: Any) -> None: + """Set class vars and validate that fields have their field type set. + + Ensure that + - field types are set as type annotations + - `identifiers` and `attributes` class vars are set according to those field types + """ + field_categories = { + kind: [] for kind in NGModelFieldKind + } + for field_name, field_info in cls.model_fields.items(): + field_type = next((m for m in field_info.metadata if isinstance(m, NGModelFieldKind)), None) + if not field_type: + raise ValueError(f"Field '{field_name}' on '{cls.__name__}' does not define a field type.") + field_categories[field_type].append(field_name) + cls.identifiers = set(field_categories[NGModelFieldKind.IDENTIFIER]) + cls.attributes = set(field_categories[NGModelFieldKind.ATTRIBUTE]) + if not hasattr(cls, "__hash__"): + raise ValueError("All fields must be hashable.") + + def get_identifiers(self) -> Dict[str, Any]: + return self.model_dump(include=self.identifiers) + + def get_attributes(self) -> Dict[str, Any]: + return self.model_dump(include=self.attributes) + + def diff_to(self, other: "NGModel") -> frozenset[Tuple[str, Any]]: + """Diff to another model. + + :returns: Dictionary that shows which fields need to be updated to what values + """ + my_attributes = set(self.model_dump(include=self.attributes).items()) + other_attributes = set(other.model_dump(include=other.attributes).items()) + return frozenset(other_attributes - my_attributes) + + +class NGAdapter: + def __init__(self): + self._store: Dict[ModelName, Dict[frozenset, NGModel]] = defaultdict(dict) + + def add(self, obj: NGModel) -> None: + """Adds a model to the store. + + :raises NGObjectAlreadyExists: Raised when the identifier key is already present in the store. + """ + key = frozenset(obj.get_identifiers().items()) + if key in self._store[obj.metadata["model_name"]]: + raise NGObjectAlreadyExists(f"Failed adding \"{obj}\". Key '{key}' already exists.") + self._store[obj.metadata["model_name"]][key] = obj + + def get_keys(self, model_name: ModelName) -> Set[frozenset]: + return set(self._store[model_name].keys()) + + def get(self, model_name: ModelName, key: Union[frozenset[Tuple[str, Any]], Dict[str, Any]]) -> Optional[NGModel]: + if isinstance(key, dict): + key = frozenset(key.items()) + return self._store[model_name].get(key, None) + + @property + def models(self) -> Set[ModelName]: + return set(self._store.keys()) + + +class NGDiff: + def __init__( + self, + to_create: Dict[ModelName, Set[frozenset]], + to_delete: Dict[ModelName, Set[frozenset]], + to_update: Dict[ModelName, Dict[frozenset, dict[str, Any]]], + ): + self.to_create = to_create + self.to_delete = to_delete + self.to_update = to_update + + def report(self): + output = "" + model_names = set(self.to_create.keys()) | set(self.to_delete.keys()) | set(self.to_update.keys()) + for model_name in sorted(model_names): + output += f"{model_name}:\n" + if self.to_create[model_name]: + output += "+\n" + for key in self.to_create[model_name]: + output += f"- {key}\n" + if self.to_delete[model_name]: + output += "-\n" + for key in self.to_delete[model_name]: + output += f"- {key}\n" + if self.to_update[model_name]: + output += "~\n" + for key in self.to_update[model_name]: + output += f"- {key}\n" + return output + + @classmethod + def diff(cls, source: NGAdapter, destination: NGAdapter) -> Self: + to_create: Dict[ModelName, Set[frozenset]] = {} + to_delete: Dict[ModelName, Set[frozenset]] = {} + to_update: Dict[ModelName, Dict[frozenset, dict[str, Any]]] = defaultdict(dict) + models_to_diff = source.models | destination.models + for model_name in models_to_diff: + existing_source_keys = source.get_keys(model_name) + existing_destination_keys = destination.get_keys(model_name) + to_create[model_name] = existing_source_keys - existing_destination_keys + to_delete[model_name] = existing_destination_keys - existing_source_keys + for key in existing_source_keys & existing_destination_keys: + if diff_dict := source.get(model_name, key).diff_to(destination.get(model_name, key)): + to_update[model_name][key] = diff_dict + return cls(to_create=to_create, to_delete=to_delete, to_update=to_update) diff --git a/tests/unit/test_diffsync.py b/tests/unit/test_diffsync.py index 088bd4b..a87fd46 100644 --- a/tests/unit/test_diffsync.py +++ b/tests/unit/test_diffsync.py @@ -835,7 +835,7 @@ def test_diffsync_remove_missing_child(log, backend_a): backend_a.remove(rdu_spine1, remove_children=True) assert log.has( "Unable to remove child element as it was not found!", - store=backend_a.store, + store=backend_a._store, parent_id=str(rdu_spine1), parent_type=rdu_spine1.get_type(), child_id=str(rdu_spine1_eth0), diff --git a/tests/unit/test_nextgen.py b/tests/unit/test_nextgen.py new file mode 100644 index 0000000..3108406 --- /dev/null +++ b/tests/unit/test_nextgen.py @@ -0,0 +1,49 @@ +from typing import Annotated + +from diffsync.nextgen import NGAdapter, NGModel, NGModelFieldKind, NGDiff + + +class NGTestModel(NGModel): + metadata = { + "model_name": "Test" + } + identifier: Annotated[int, NGModelFieldKind.IDENTIFIER] + int_value: Annotated[int, NGModelFieldKind.ATTRIBUTE] = 0 + str_value: Annotated[str, NGModelFieldKind.ATTRIBUTE] = "" + + +def test_get_identifiers(): + instance = NGTestModel(identifier=1) + assert instance.get_identifiers() == {"identifier": instance.identifier} + + +def test_adapter_add_get(): + """Test that the `add` and `get` methods on the adapter work.""" + adapter = NGAdapter() + instance = NGTestModel(identifier=1) + adapter.add(instance) + assert adapter.get(NGTestModel.metadata["model_name"], {"identifier": 1}) == instance + + +def test_diff_model(): + a = NGTestModel(identifier=5, int_value=10, str_value="a") + b = NGTestModel(identifier=5, int_value=10, str_value="b") + + assert dict(a.diff_to(b)) == {"str_value": b.str_value} + + +def test_diff(): + only_in_a = NGTestModel(identifier=1, int_value=10, str_value="Only in A") + in_both_a = NGTestModel(identifier=2, int_value=10, str_value="In A") + in_both_b = NGTestModel(identifier=2, int_value=10, str_value="In B") + only_in_b = NGTestModel(identifier=3, int_value=30, str_value="Only in B") + + adapter_a = NGAdapter() + adapter_a.add(only_in_a) + adapter_a.add(in_both_a) + adapter_b = NGAdapter() + adapter_b.add(only_in_b) + adapter_b.add(in_both_b) + + resulting_diff = NGDiff.diff(adapter_a, adapter_b) + assert resulting_diff.to_create[NGTestModel.metadata["model_name"]] == frozenset(only_in_b) \ No newline at end of file