Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions openwisp_ipam/api/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ def get_api_urls(api_views):
api_views.get_next_available_ip,
name="get_next_available_ip",
),
path(
"subnet/<str:subnet_id>/get-next-available-subnet/",
api_views.get_next_available_subnet,
name="get_next_available_subnet",
),
path(
"subnet/<str:subnet_id>/request-ip/",
api_views.request_ip,
Expand Down
36 changes: 36 additions & 0 deletions openwisp_ipam/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,41 @@ def get(self, request, *args, **kwargs):
return Response(subnet.get_next_available_ip())


class AvailableSubnetView(
ProtectedAPIMixin, FilterByOrganizationManaged, RetrieveAPIView
):
subnet_model = Subnet
queryset = Subnet.objects.none()
serializer_class = serializers.Serializer

def get(self, request, *args, **kwargs):
subnet = get_object_or_404(self.subnet_model, pk=self.kwargs["subnet_id"])
target_prefix = request.query_params.get("prefix")
parent_subnet = request.query_params.get("parent_subnet")

# Convert prefix to int if provided
if target_prefix:
try:
target_prefix = int(target_prefix)
except ValueError:
return Response(
{"error": _("Invalid prefix length. Must be an integer.")},
status=status.HTTP_400_BAD_REQUEST,
)

next_subnet = subnet.get_next_available_subnet(
target_prefix=target_prefix, parent_subnet=parent_subnet
)

if next_subnet:
return Response({"subnet": next_subnet})
else:
return Response(
{"error": _("No available subnet found with the specified criteria.")},
status=status.HTTP_404_NOT_FOUND,
)


class IpAddressListCreateView(IpAddressOrgMixin, ProtectedAPIMixin, ListCreateAPIView):
queryset = IpAddress.objects.none()
subnet_model = Subnet
Expand Down Expand Up @@ -282,4 +317,5 @@ def get_queryset(self):
ip_address = IpAddressView.as_view()
subnet_list_ipaddress = IpAddressListCreateView.as_view()
get_next_available_ip = AvailableIpView.as_view()
get_next_available_subnet = AvailableSubnetView.as_view()
subnet_hosts = SubnetHostsView.as_view()
144 changes: 139 additions & 5 deletions openwisp_ipam/base/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,48 @@ def __str__(self):
def clean(self):
if not self.subnet:
return
self._validate_multitenant_uniqueness()
self._validate_multitenant_master_subnet()
self._validate_multitenant_unique_child_subnet()
self._validate_overlapping_subnets()
self._validate_master_subnet_consistency()

try:
self._validate_multitenant_uniqueness()
self._validate_multitenant_master_subnet()
self._validate_multitenant_unique_child_subnet()
self._validate_overlapping_subnets()
self._validate_master_subnet_consistency()

except ValidationError as e:
next_subnet = self.get_next_available_subnet()
if next_subnet:
self._suggested_subnet = next_subnet
error_dict = e.message_dict.copy()
if "subnet" in error_dict:
original_message = error_dict["subnet"][0]
error_dict["subnet"] = [
f"{original_message} Suggested alternative: {next_subnet}"
]
else:
error_dict["subnet"] = [ # fallback
f"Subnet conflict found. Suggested alternative: {next_subnet}"
]
raise ValidationError(error_dict)
else:
raise e

def _find_optimal_parent_subnet(self, current_network):
current_prefix = current_network.prefixlen

if current_network.version == 4: # IPv4
if current_prefix <= 8:
parent_prefix = 8
elif current_prefix <= 16:
parent_prefix = max(current_prefix - 4, 8) # 16 options
elif current_prefix <= 24:
parent_prefix = max(current_prefix - 6, 8) # 64 options
else:
parent_prefix = max(current_prefix - 4, 8) # 16 options
else:
parent_prefix = max(current_prefix - 16, 0) # IPv6 : 2^16 options

return current_network.supernet(new_prefix=parent_prefix)

def _validate_multitenant_uniqueness(self):
qs = self._meta.model.objects.exclude(pk=self.pk).filter(subnet=self.subnet)
Expand All @@ -76,6 +113,16 @@ def _validate_multitenant_uniqueness(self):
)
}
)
# if adding an organization-specific subnet,
# ensure it's not already taken by the same org
if self.organization and qs.filter(organization=self.organization).exists():
raise ValidationError(
{
"subnet": _(
"Subnet with this Subnet and Organization already exists."
)
}
)

def _validate_multitenant_master_subnet(self):
if not self.master_subnet:
Expand Down Expand Up @@ -151,6 +198,93 @@ def _validate_master_subnet_consistency(self):
if not ip_network(self.subnet).subnet_of(ip_network(self.master_subnet.subnet)):
raise ValidationError({"master_subnet": _("Invalid master subnet.")})

def get_next_available_subnet(self, target_prefix=None, parent_subnet=None):
if target_prefix is None:
try:
network = (
ip_network(self.subnet)
if isinstance(self.subnet, str)
else self.subnet
)
except ValueError:
return None
target_prefix = network.prefixlen

if parent_subnet:
parent_network = ip_network(parent_subnet)
elif self.master_subnet:
parent_network = ip_network(self.master_subnet.subnet)
else:
current_network = ip_network(self.subnet)
parent_network = self._find_optimal_parent_subnet(current_network)

if self.organization:
organization_query = Q(organization_id=self.organization_id) | Q(
organization_id__isnull=True
)
else:
organization_query = Q()

existing_subnets = self._meta.model.objects.filter(
organization_query
).exclude(pk=self.pk)
existing_networks = [ip_network(sub.subnet) for sub in existing_subnets]

for existing in existing_networks:
if parent_network.overlaps(existing) and parent_network.subnet_of(
existing
):
parent_network = self._find_optimal_parent_subnet(existing)
break

if self.organization:
organization_query = Q(organization_id=self.organization_id) | Q(
organization_id__isnull=True
)
else:
organization_query = Q()

existing_subnets = self._meta.model.objects.filter(organization_query).exclude(
pk=self.pk
)
existing_networks = {
ip_network(sub.subnet)
for sub in existing_subnets
if ip_network(sub.subnet).subnet_of(parent_network)
or ip_network(sub.subnet).overlaps(parent_network)
}

for candidate_subnet in parent_network.subnets(new_prefix=target_prefix):
if any(
candidate_subnet.overlaps(existing) for existing in existing_networks
):
continue

if self._validate_candidate_subnet(candidate_subnet):
return str(candidate_subnet)

return None
Comment on lines +201 to +266
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Potential performance concern with large parent networks.

When parent_network is large (e.g., a /8 network with target_prefix /24), parent_network.subnets(new_prefix=target_prefix) generates 2^16 = 65,536 candidate subnets to iterate through. For IPv6 with larger prefix differences, this could be exponentially worse.

Consider adding a safeguard to limit the number of candidates evaluated or document the expected behavior for large ranges.

💡 Suggested safeguard
+        MAX_CANDIDATES = 10000  # Prevent runaway iteration
+        candidate_count = 0
         for candidate_subnet in parent_network.subnets(new_prefix=target_prefix):
+            candidate_count += 1
+            if candidate_count > MAX_CANDIDATES:
+                return None  # Too many candidates to evaluate
+
             if any(
                 candidate_subnet.overlaps(existing) for existing in existing_networks
             ):
                 continue
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def get_next_available_subnet(self, target_prefix=None, parent_subnet=None):
if target_prefix is None:
try:
network = (
ip_network(self.subnet)
if isinstance(self.subnet, str)
else self.subnet
)
except ValueError:
return None
target_prefix = network.prefixlen
if parent_subnet:
parent_network = ip_network(parent_subnet)
elif self.master_subnet:
parent_network = ip_network(self.master_subnet.subnet)
else:
current_network = ip_network(self.subnet)
parent_network = self._find_optimal_parent_subnet(current_network)
if self.organization:
organization_query = Q(organization_id=self.organization_id) | Q(
organization_id__isnull=True
)
else:
organization_query = Q()
existing_subnets = self._meta.model.objects.filter(
organization_query
).exclude(pk=self.pk)
existing_networks = [ip_network(sub.subnet) for sub in existing_subnets]
for existing in existing_networks:
if parent_network.overlaps(existing) and parent_network.subnet_of(
existing
):
parent_network = self._find_optimal_parent_subnet(existing)
break
if self.organization:
organization_query = Q(organization_id=self.organization_id) | Q(
organization_id__isnull=True
)
else:
organization_query = Q()
existing_subnets = self._meta.model.objects.filter(organization_query).exclude(
pk=self.pk
)
existing_networks = {
ip_network(sub.subnet)
for sub in existing_subnets
if ip_network(sub.subnet).subnet_of(parent_network)
or ip_network(sub.subnet).overlaps(parent_network)
}
for candidate_subnet in parent_network.subnets(new_prefix=target_prefix):
if any(
candidate_subnet.overlaps(existing) for existing in existing_networks
):
continue
if self._validate_candidate_subnet(candidate_subnet):
return str(candidate_subnet)
return None
def get_next_available_subnet(self, target_prefix=None, parent_subnet=None):
if target_prefix is None:
try:
network = (
ip_network(self.subnet)
if isinstance(self.subnet, str)
else self.subnet
)
except ValueError:
return None
target_prefix = network.prefixlen
if parent_subnet:
parent_network = ip_network(parent_subnet)
elif self.master_subnet:
parent_network = ip_network(self.master_subnet.subnet)
else:
current_network = ip_network(self.subnet)
parent_network = self._find_optimal_parent_subnet(current_network)
if self.organization:
organization_query = Q(organization_id=self.organization_id) | Q(
organization_id__isnull=True
)
else:
organization_query = Q()
existing_subnets = self._meta.model.objects.filter(
organization_query
).exclude(pk=self.pk)
existing_networks = [ip_network(sub.subnet) for sub in existing_subnets]
for existing in existing_networks:
if parent_network.overlaps(existing) and parent_network.subnet_of(
existing
):
parent_network = self._find_optimal_parent_subnet(existing)
break
if self.organization:
organization_query = Q(organization_id=self.organization_id) | Q(
organization_id__isnull=True
)
else:
organization_query = Q()
existing_subnets = self._meta.model.objects.filter(organization_query).exclude(
pk=self.pk
)
existing_networks = {
ip_network(sub.subnet)
for sub in existing_subnets
if ip_network(sub.subnet).subnet_of(parent_network)
or ip_network(sub.subnet).overlaps(parent_network)
}
MAX_CANDIDATES = 10000 # Prevent runaway iteration
candidate_count = 0
for candidate_subnet in parent_network.subnets(new_prefix=target_prefix):
candidate_count += 1
if candidate_count > MAX_CANDIDATES:
return None # Too many candidates to evaluate
if any(
candidate_subnet.overlaps(existing) for existing in existing_networks
):
continue
if self._validate_candidate_subnet(candidate_subnet):
return str(candidate_subnet)
return None
🤖 Prompt for AI Agents
In `@openwisp_ipam/base/models.py` around lines 201 - 266, The loop over
parent_network.subnets(new_prefix=target_prefix) in get_next_available_subnet
can produce huge iteration counts for large parent networks; add a safeguard by
capping evaluated candidates (e.g., introduce MAX_CANDIDATE_SUBNETS or an
optional parameter max_candidates on get_next_available_subnet) and iterate
using a bounded iterator (islice) so you stop after the limit, returning None
(or raising a specific exception) if the cap is reached; ensure the check still
calls _validate_candidate_subnet and preserves existing organization filtering,
and reference get_next_available_subnet, parent_network.subnets, target_prefix,
and _validate_candidate_subnet when making the change.


def _validate_candidate_subnet(self, candidate_subnet):
try:
temp_instance = self._meta.model(
name=self.name,
subnet=str(candidate_subnet),
description=self.description,
master_subnet=self.master_subnet,
organization=self.organization,
)

temp_instance._validate_multitenant_uniqueness()
temp_instance._validate_multitenant_master_subnet()
temp_instance._validate_multitenant_unique_child_subnet()
temp_instance._validate_overlapping_subnets()
temp_instance._validate_master_subnet_consistency()

return True
except ValidationError:
return False

def get_next_available_ip(self):
ipaddress_set = [ip.ip_address for ip in self.ipaddress_set.all()]
subnet_hosts = self.subnet.hosts()
Expand Down
55 changes: 55 additions & 0 deletions openwisp_ipam/tests/test_api.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
from ipaddress import ip_network

from django.contrib.auth import get_user_model
from django.core.files.uploadedfile import SimpleUploadedFile
Expand Down Expand Up @@ -151,6 +152,60 @@ def test_delete_subnet_api(self):
self.assertEqual(response.status_code, 204)
self.assertEqual(Subnet.objects.count(), 0)

def test_get_next_available_subnet_success(self):
subnet = self._create_subnet(subnet="10.0.0.0/24")
url = reverse("ipam:get_next_available_subnet", kwargs={"subnet_id": subnet.pk})
response = self.client.get(url)
self.assertEqual(response.status_code, 200)
self.assertIn("subnet", response.data)
self.assertTrue(ip_network(response.data["subnet"]))

def test_get_next_available_subnet_invalid_prefix(self):
subnet = self._create_subnet(subnet="10.0.0.0/24")
url = reverse("ipam:get_next_available_subnet", kwargs={"subnet_id": subnet.pk})
response = self.client.get(f"{url}?prefix=invalid")
self.assertEqual(response.status_code, 400)
self.assertIn("error", response.data)

def test_get_next_available_subnet_no_available(self):
parent_subnet = self._create_subnet(
subnet="10.0.0.0/23"
) # 512 IPs, can fit two /24s

# fill all available /24 subnets
Subnet.objects.create(subnet="10.0.0.0/24", name="Full")
Subnet.objects.create(subnet="10.0.1.0/24", name="Full")

url = reverse(
"ipam:get_next_available_subnet", kwargs={"subnet_id": parent_subnet.pk}
)
response = self.client.get(
url, data={"prefix": 24, "parent_subnet": parent_subnet.subnet}
)
self.assertEqual(response.status_code, 404)
self.assertIn("error", response.data)

def test_get_next_available_subnet_with_prefix(self):
subnet = self._create_subnet(subnet="10.0.0.0/16")
url = reverse("ipam:get_next_available_subnet", kwargs={"subnet_id": subnet.pk})
response = self.client.get(f"{url}?prefix=24")
self.assertEqual(response.status_code, 200)
self.assertIn("subnet", response.data)
self.assertTrue(ip_network(response.data["subnet"]).prefixlen == 24)

def test_get_next_available_subnet_multi_org(self):
subnet1 = self._create_subnet(subnet="10.0.0.0/24")
org2 = self._create_org(name="Other Org")
Subnet.objects.create(
subnet="10.0.0.0/24", organization=org2, name="Other Org Subnet"
)
url = reverse(
"ipam:get_next_available_subnet", kwargs={"subnet_id": subnet1.pk}
)
response = self.client.get(url)
self.assertEqual(response.status_code, 200)
self.assertIn("subnet", response.data)

def test_create_ip_address_api(self):
subnet_id = self._create_subnet(subnet="10.0.0.0/24").id
post_data = self._post_data(
Expand Down
15 changes: 12 additions & 3 deletions openwisp_ipam/tests/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,10 @@ def test_overlapping_subnet(self):
)
message_dict = context_manager.exception.message_dict
self.assertIn("subnet", message_dict)
self.assertIn("Subnet overlaps with 10.0.0.0/16.", message_dict["subnet"])
self.assertIn(
"Subnet overlaps with 10.0.0.0/16. Suggested alternative: 10.1.0.0/24",
message_dict["subnet"],
)
subnet1.delete()

with self.subTest("10.0.0.0/16 overlaps with 10.0.0.0/24"):
Expand All @@ -149,7 +152,10 @@ def test_overlapping_subnet(self):
)
message_dict = context_manager.exception.message_dict
self.assertIn("subnet", message_dict)
self.assertIn("Subnet overlaps with 10.0.0.0/24.", message_dict["subnet"])
self.assertIn(
"Subnet overlaps with 10.0.0.0/24. Suggested alternative: 10.1.0.0/16",
message_dict["subnet"],
)
subnet1.delete()

with self.subTest("different orgs do not overlap"):
Expand Down Expand Up @@ -384,7 +390,10 @@ def test_unique_subnet_multitenancy(self):
)
message_dict = context_manager.exception.message_dict
self.assertIn(
"Subnet with this Subnet and Organization already exists.",
(
"Subnet with this Subnet and Organization already exists. "
"Suggested alternative: 10.0.1.0/24"
),
str(message_dict),
)

Expand Down
10 changes: 10 additions & 0 deletions tests/openwisp2/sample_ipam/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from openwisp_ipam.api.views import (
AvailableIpView as BaseAvailableIpView,
AvailableSubnetView as BaseAvailableSubnetView,
ExportSubnetView as BaseExportSubnetView,
ImportSubnetView as BaseImportSubnetView,
IpAddressListCreateView as BaseIpAddressListCreateView,
Expand Down Expand Up @@ -53,6 +54,14 @@ class SubnetListCreateView(BaseSubnetListCreateView):
pass


class AvailableSubnetView(BaseAvailableSubnetView):
"""
Get next available subnet
"""

pass


class SubnetView(BaseSubnetView):
"""
View for retrieving, updating or deleting a subnet instance.
Expand Down Expand Up @@ -101,4 +110,5 @@ class SubnetHostsView(BaseSubnetHostsView):
ip_address = IpAddressView.as_view()
subnet_list_ipaddress = IpAddressListCreateView.as_view()
get_next_available_ip = AvailableIpView.as_view()
get_next_available_subnet = AvailableSubnetView.as_view()
subnet_hosts = SubnetHostsView.as_view()
Loading