|
| 1 | +from celery import shared_task |
| 2 | +from haystack import signals |
| 3 | +from haystack.exceptions import NotHandled |
| 4 | + |
| 5 | + |
| 6 | +@shared_task |
| 7 | +def update_search_index(action, instance_pk, app_label, model_name): |
| 8 | + """Async task to update search index""" |
| 9 | + from django.apps import apps |
| 10 | + |
| 11 | + try: |
| 12 | + model_class = apps.get_model(app_label, model_name) |
| 13 | + instance = model_class.objects.get(pk=instance_pk) |
| 14 | + |
| 15 | + from haystack import connection_router, connections |
| 16 | + |
| 17 | + using_backends = connection_router.for_write(instance=instance) |
| 18 | + |
| 19 | + for using in using_backends: |
| 20 | + try: |
| 21 | + index = connections[using].get_unified_index().get_index(model_class) |
| 22 | + if action == "update": |
| 23 | + index.update_object(instance, using=using) |
| 24 | + elif action == "delete": |
| 25 | + index.remove_object(instance, using=using) |
| 26 | + except NotHandled: |
| 27 | + pass |
| 28 | + |
| 29 | + except model_class.DoesNotExist: |
| 30 | + pass # Instance was deleted |
| 31 | + except Exception as e: |
| 32 | + # Log error but don't fail |
| 33 | + import logging |
| 34 | + |
| 35 | + logging.error(f"Search index update failed: {e}") |
| 36 | + |
| 37 | + |
| 38 | +class CelerySignalProcessor(signals.BaseSignalProcessor): |
| 39 | + """Signal processor that queues updates to Celery""" |
| 40 | + |
| 41 | + def handle_save(self, sender, instance, **kwargs): |
| 42 | + update_search_index.delay( |
| 43 | + "update", instance.pk, instance._meta.app_label, instance._meta.model_name |
| 44 | + ) |
| 45 | + |
| 46 | + def handle_delete(self, sender, instance, **kwargs): |
| 47 | + update_search_index.delay( |
| 48 | + "delete", instance.pk, instance._meta.app_label, instance._meta.model_name |
| 49 | + ) |
0 commit comments