From 3d2ad7d896c7b9750f944fdac5b5a3196c948f01 Mon Sep 17 00:00:00 2001 From: Pranshu Date: Sun, 12 Oct 2025 05:30:49 +0530 Subject: [PATCH 1/2] [feature] Suggests next availible subnet #29 Functioin clean() now calculates next availible subnet and adds it as a suggestion in the error message. Added an api endpoint to get next availible subnet for a particular subnet. And modified tests. Fixes #29 --- openwisp_ipam/api/urls.py | 5 + openwisp_ipam/api/views.py | 36 ++++++++ openwisp_ipam/base/models.py | 144 ++++++++++++++++++++++++++++- openwisp_ipam/tests/test_models.py | 13 ++- 4 files changed, 190 insertions(+), 8 deletions(-) diff --git a/openwisp_ipam/api/urls.py b/openwisp_ipam/api/urls.py index 063b684..4678d3b 100644 --- a/openwisp_ipam/api/urls.py +++ b/openwisp_ipam/api/urls.py @@ -15,6 +15,11 @@ def get_api_urls(api_views): api_views.get_next_available_ip, name="get_next_available_ip", ), + path( + "subnet//get-next-available-subnet/", + api_views.get_next_available_subnet, + name="get_next_available_subnet", + ), path( "subnet//request-ip/", api_views.request_ip, diff --git a/openwisp_ipam/api/views.py b/openwisp_ipam/api/views.py index 51d89e9..62acad4 100644 --- a/openwisp_ipam/api/views.py +++ b/openwisp_ipam/api/views.py @@ -174,6 +174,41 @@ def get(self, request, *args, **kwargs): return Response(subnet.get_next_available_ip()) +class AvailableSubnetView( + ProtectedAPIMixin, FilterByOrganizationManaged, RetrieveAPIView +): + subnet_model = Subnet + queryset = Subnet.objects.none() + serializer_class = serializers.Serializer + + def get(self, request, *args, **kwargs): + subnet = get_object_or_404(self.subnet_model, pk=self.kwargs["subnet_id"]) + target_prefix = request.query_params.get("prefix") + parent_subnet = request.query_params.get("parent_subnet") + + # Convert prefix to int if provided + if target_prefix: + try: + target_prefix = int(target_prefix) + except ValueError: + return Response( + {"error": _("Invalid prefix length. Must be an integer.")}, + status=status.HTTP_400_BAD_REQUEST, + ) + + next_subnet = subnet.get_next_available_subnet( + target_prefix=target_prefix, parent_subnet=parent_subnet + ) + + if next_subnet: + return Response({"subnet": next_subnet}) + else: + return Response( + {"error": _("No available subnet found with the specified criteria.")}, + status=status.HTTP_404_NOT_FOUND, + ) + + class IpAddressListCreateView(IpAddressOrgMixin, ProtectedAPIMixin, ListCreateAPIView): queryset = IpAddress.objects.none() subnet_model = Subnet @@ -282,4 +317,5 @@ def get_queryset(self): ip_address = IpAddressView.as_view() subnet_list_ipaddress = IpAddressListCreateView.as_view() get_next_available_ip = AvailableIpView.as_view() +get_next_available_subnet = AvailableSubnetView.as_view() subnet_hosts = SubnetHostsView.as_view() diff --git a/openwisp_ipam/base/models.py b/openwisp_ipam/base/models.py index c3a8be6..0d2595c 100644 --- a/openwisp_ipam/base/models.py +++ b/openwisp_ipam/base/models.py @@ -48,11 +48,48 @@ def __str__(self): def clean(self): if not self.subnet: return - self._validate_multitenant_uniqueness() - self._validate_multitenant_master_subnet() - self._validate_multitenant_unique_child_subnet() - self._validate_overlapping_subnets() - self._validate_master_subnet_consistency() + + try: + self._validate_multitenant_uniqueness() + self._validate_multitenant_master_subnet() + self._validate_multitenant_unique_child_subnet() + self._validate_overlapping_subnets() + self._validate_master_subnet_consistency() + + except ValidationError as e: + next_subnet = self.get_next_available_subnet() + if next_subnet: + self._suggested_subnet = next_subnet + error_dict = e.message_dict.copy() + if "subnet" in error_dict: + original_message = error_dict["subnet"][0] + error_dict["subnet"] = [ + f"{original_message} Suggested alternative: {next_subnet}" + ] + else: + error_dict["subnet"] = [ # fallback + f"Subnet conflict found. Suggested alternative: {next_subnet}" + ] + raise ValidationError(error_dict) + else: + raise e + + def _find_optimal_parent_subnet(self, current_network): + current_prefix = current_network.prefixlen + + if current_network.version == 4: # IPv4 + if current_prefix <= 8: + parent_prefix = 8 + elif current_prefix <= 16: + parent_prefix = max(current_prefix - 4, 8) # 16 options + elif current_prefix <= 24: + parent_prefix = max(current_prefix - 6, 8) # 64 options + else: + parent_prefix = max(current_prefix - 4, 8) # 16 options + else: + parent_prefix = max(current_prefix - 16, 0) # IPv6 : 2^16 options + + return current_network.supernet(new_prefix=parent_prefix) def _validate_multitenant_uniqueness(self): qs = self._meta.model.objects.exclude(pk=self.pk).filter(subnet=self.subnet) @@ -76,6 +113,16 @@ def _validate_multitenant_uniqueness(self): ) } ) + # if adding an organization-specific subnet, + # ensure it's not already taken by the same org + if self.organization and qs.filter(organization=self.organization).exists(): + raise ValidationError( + { + "subnet": _( + "Subnet with this Subnet and Organization already exists." + ) + } + ) def _validate_multitenant_master_subnet(self): if not self.master_subnet: @@ -151,6 +198,93 @@ def _validate_master_subnet_consistency(self): if not ip_network(self.subnet).subnet_of(ip_network(self.master_subnet.subnet)): raise ValidationError({"master_subnet": _("Invalid master subnet.")}) + def get_next_available_subnet(self, target_prefix=None, parent_subnet=None): + if target_prefix is None: + try: + network = ( + ip_network(self.subnet) + if isinstance(self.subnet, str) + else self.subnet + ) + except ValueError: + return None + target_prefix = network.prefixlen + + if parent_subnet: + parent_network = ip_network(parent_subnet) + elif self.master_subnet: + parent_network = ip_network(self.master_subnet.subnet) + else: + current_network = ip_network(self.subnet) + parent_network = self._find_optimal_parent_subnet(current_network) + + if self.organization: + organization_query = Q(organization_id=self.organization_id) | Q( + organization_id__isnull=True + ) + else: + organization_query = Q() + + existing_subnets = self._meta.model.objects.filter( + organization_query + ).exclude(pk=self.pk) + existing_networks = [ip_network(sub.subnet) for sub in existing_subnets] + + for existing in existing_networks: + if parent_network.overlaps(existing) and parent_network.subnet_of( + existing + ): + parent_network = self._find_optimal_parent_subnet(existing) + break + + if self.organization: + organization_query = Q(organization_id=self.organization_id) | Q( + organization_id__isnull=True + ) + else: + organization_query = Q() + + existing_subnets = self._meta.model.objects.filter(organization_query).exclude( + pk=self.pk + ) + existing_networks = { + ip_network(sub.subnet) + for sub in existing_subnets + if ip_network(sub.subnet).subnet_of(parent_network) + or ip_network(sub.subnet).overlaps(parent_network) + } + + for candidate_subnet in parent_network.subnets(new_prefix=target_prefix): + if any( + candidate_subnet.overlaps(existing) for existing in existing_networks + ): + continue + + if self._validate_candidate_subnet(candidate_subnet): + return str(candidate_subnet) + + return None + + def _validate_candidate_subnet(self, candidate_subnet): + try: + temp_instance = self._meta.model( + name=self.name, + subnet=str(candidate_subnet), + description=self.description, + master_subnet=self.master_subnet, + organization=self.organization, + ) + + temp_instance._validate_multitenant_uniqueness() + temp_instance._validate_multitenant_master_subnet() + temp_instance._validate_multitenant_unique_child_subnet() + temp_instance._validate_overlapping_subnets() + temp_instance._validate_master_subnet_consistency() + + return True + except ValidationError: + return False + def get_next_available_ip(self): ipaddress_set = [ip.ip_address for ip in self.ipaddress_set.all()] subnet_hosts = self.subnet.hosts() diff --git a/openwisp_ipam/tests/test_models.py b/openwisp_ipam/tests/test_models.py index 576a7d8..4d4488e 100644 --- a/openwisp_ipam/tests/test_models.py +++ b/openwisp_ipam/tests/test_models.py @@ -138,7 +138,10 @@ def test_overlapping_subnet(self): ) message_dict = context_manager.exception.message_dict self.assertIn("subnet", message_dict) - self.assertIn("Subnet overlaps with 10.0.0.0/16.", message_dict["subnet"]) + self.assertIn( + "Subnet overlaps with 10.0.0.0/16. Suggested alternative: 10.1.0.0/24", + message_dict["subnet"], + ) subnet1.delete() with self.subTest("10.0.0.0/16 overlaps with 10.0.0.0/24"): @@ -149,7 +152,10 @@ def test_overlapping_subnet(self): ) message_dict = context_manager.exception.message_dict self.assertIn("subnet", message_dict) - self.assertIn("Subnet overlaps with 10.0.0.0/24.", message_dict["subnet"]) + self.assertIn( + "Subnet overlaps with 10.0.0.0/24. Suggested alternative: 10.1.0.0/16", + message_dict["subnet"], + ) subnet1.delete() with self.subTest("different orgs do not overlap"): @@ -384,7 +390,8 @@ def test_unique_subnet_multitenancy(self): ) message_dict = context_manager.exception.message_dict self.assertIn( - "Subnet with this Subnet and Organization already exists.", + "Subnet with this Subnet and Organization already exists. \ + Suggested alternative: 10.0.1.0/24", str(message_dict), ) From 1066d1b059754f7e10554e396c6cfcd3ac97885d Mon Sep 17 00:00:00 2001 From: pranshustuff Date: Sun, 19 Oct 2025 20:34:35 +0530 Subject: [PATCH 2/2] [fix] Added API tests and checked tests and QA #29 Same as above. Fixes #29. --- openwisp_ipam/tests/test_api.py | 55 ++++++++++++++++++++++++++++ openwisp_ipam/tests/test_models.py | 6 ++- tests/openwisp2/sample_ipam/views.py | 10 +++++ 3 files changed, 69 insertions(+), 2 deletions(-) diff --git a/openwisp_ipam/tests/test_api.py b/openwisp_ipam/tests/test_api.py index 74bffd8..29a5a73 100644 --- a/openwisp_ipam/tests/test_api.py +++ b/openwisp_ipam/tests/test_api.py @@ -1,4 +1,5 @@ import json +from ipaddress import ip_network from django.contrib.auth import get_user_model from django.core.files.uploadedfile import SimpleUploadedFile @@ -151,6 +152,60 @@ def test_delete_subnet_api(self): self.assertEqual(response.status_code, 204) self.assertEqual(Subnet.objects.count(), 0) + def test_get_next_available_subnet_success(self): + subnet = self._create_subnet(subnet="10.0.0.0/24") + url = reverse("ipam:get_next_available_subnet", kwargs={"subnet_id": subnet.pk}) + response = self.client.get(url) + self.assertEqual(response.status_code, 200) + self.assertIn("subnet", response.data) + self.assertTrue(ip_network(response.data["subnet"])) + + def test_get_next_available_subnet_invalid_prefix(self): + subnet = self._create_subnet(subnet="10.0.0.0/24") + url = reverse("ipam:get_next_available_subnet", kwargs={"subnet_id": subnet.pk}) + response = self.client.get(f"{url}?prefix=invalid") + self.assertEqual(response.status_code, 400) + self.assertIn("error", response.data) + + def test_get_next_available_subnet_no_available(self): + parent_subnet = self._create_subnet( + subnet="10.0.0.0/23" + ) # 512 IPs, can fit two /24s + + # fill all available /24 subnets + Subnet.objects.create(subnet="10.0.0.0/24", name="Full") + Subnet.objects.create(subnet="10.0.1.0/24", name="Full") + + url = reverse( + "ipam:get_next_available_subnet", kwargs={"subnet_id": parent_subnet.pk} + ) + response = self.client.get( + url, data={"prefix": 24, "parent_subnet": parent_subnet.subnet} + ) + self.assertEqual(response.status_code, 404) + self.assertIn("error", response.data) + + def test_get_next_available_subnet_with_prefix(self): + subnet = self._create_subnet(subnet="10.0.0.0/16") + url = reverse("ipam:get_next_available_subnet", kwargs={"subnet_id": subnet.pk}) + response = self.client.get(f"{url}?prefix=24") + self.assertEqual(response.status_code, 200) + self.assertIn("subnet", response.data) + self.assertTrue(ip_network(response.data["subnet"]).prefixlen == 24) + + def test_get_next_available_subnet_multi_org(self): + subnet1 = self._create_subnet(subnet="10.0.0.0/24") + org2 = self._create_org(name="Other Org") + Subnet.objects.create( + subnet="10.0.0.0/24", organization=org2, name="Other Org Subnet" + ) + url = reverse( + "ipam:get_next_available_subnet", kwargs={"subnet_id": subnet1.pk} + ) + response = self.client.get(url) + self.assertEqual(response.status_code, 200) + self.assertIn("subnet", response.data) + def test_create_ip_address_api(self): subnet_id = self._create_subnet(subnet="10.0.0.0/24").id post_data = self._post_data( diff --git a/openwisp_ipam/tests/test_models.py b/openwisp_ipam/tests/test_models.py index 4d4488e..38025a6 100644 --- a/openwisp_ipam/tests/test_models.py +++ b/openwisp_ipam/tests/test_models.py @@ -390,8 +390,10 @@ def test_unique_subnet_multitenancy(self): ) message_dict = context_manager.exception.message_dict self.assertIn( - "Subnet with this Subnet and Organization already exists. \ - Suggested alternative: 10.0.1.0/24", + ( + "Subnet with this Subnet and Organization already exists. " + "Suggested alternative: 10.0.1.0/24" + ), str(message_dict), ) diff --git a/tests/openwisp2/sample_ipam/views.py b/tests/openwisp2/sample_ipam/views.py index 35bdc3f..5a1f990 100644 --- a/tests/openwisp2/sample_ipam/views.py +++ b/tests/openwisp2/sample_ipam/views.py @@ -7,6 +7,7 @@ from openwisp_ipam.api.views import ( AvailableIpView as BaseAvailableIpView, + AvailableSubnetView as BaseAvailableSubnetView, ExportSubnetView as BaseExportSubnetView, ImportSubnetView as BaseImportSubnetView, IpAddressListCreateView as BaseIpAddressListCreateView, @@ -53,6 +54,14 @@ class SubnetListCreateView(BaseSubnetListCreateView): pass +class AvailableSubnetView(BaseAvailableSubnetView): + """ + Get next available subnet + """ + + pass + + class SubnetView(BaseSubnetView): """ View for retrieving, updating or deleting a subnet instance. @@ -101,4 +110,5 @@ class SubnetHostsView(BaseSubnetHostsView): ip_address = IpAddressView.as_view() subnet_list_ipaddress = IpAddressListCreateView.as_view() get_next_available_ip = AvailableIpView.as_view() +get_next_available_subnet = AvailableSubnetView.as_view() subnet_hosts = SubnetHostsView.as_view()