@@ -22,8 +22,9 @@ class SyncProcessor {
2222 const CRON_LOCK_TIMEOUT = 9 * MINUTE_IN_SECONDS ;
2323
2424 const MSG_ITEM_NO_DATA = "Item has no data " ;
25+ const SUBJECT_ITEM_MAX_ATTEMPTS = "SyncProcessor: Max attempts reached " ;
2526 const MSG_ITEM_MAX_ATTEMPTS = "Item was processed more than " . self ::MAX_SAVE_ATTEMPTS . " times. " .
26- "Has been removed from the queue. " ;
27+ "Has been removed from the queue. " ;
2728
2829 private CrmDao $ crm_dao ;
2930 private QueueDao $ queue ;
@@ -40,16 +41,18 @@ public static function process_queue() {
4041 $ processor = new self ();
4142
4243 if (!$ processor ->can_sync_to_crm && !$ processor ->can_sync_to_mailchimp ) {
43- Util::remove_cron ( self ::CRON_HOOK_CRM_MC_SAVE );
44+ Util::remove_cron (self ::CRON_HOOK_CRM_MC_SAVE );
45+ Util::debug_log ("msg=No CRM or Mailchimp API key found, removing cron job " );
4446 return ;
4547 }
4648
4749 if (!$ processor ->acquire_lock ()) {
50+ Util::debug_log ("msg=Sync processor already running, skipping this execution " );
4851 return ;
4952 }
5053
5154 try {
52- $ processor ->queue = new QueueDao (SyncEnqueuer:: QUEUE_KEY );
55+ $ processor ->queue = self :: get_queue ( );
5356 $ items = array_slice ($ processor ->queue ->get_all (), 0 , self ::MAX_BATCH_SIZE );
5457
5558 if (empty ($ items )) {
@@ -63,36 +66,40 @@ public static function process_queue() {
6366 $ first_item = $ items [0 ] ?? null ;
6467 $ submission = ($ first_item instanceof CrmQueueItem) ? $ first_item ->get_submission_id () : "(submission id not found) " ;
6568 CrmSaver::send_permanent_error_notification ($ submission , $ e ->getMessage ());
69+ Util::debug_log ("submissionId= {$ submission } msg=Failed to create CRM DAO. Skipping. " );
6670 return ;
6771 }
6872 }
6973
7074 /** @var CrmQueueItem $item */
7175 foreach ($ items as $ item ) {
72- if ($ processor ->too_many_attempts ($ item )) {
76+ if ($ processor ->too_many_attempts ( $ item ) || $ processor -> too_recent_attempt ($ item )) {
7377 continue ;
7478 }
7579
7680 if (!$ item ->has_data ()) {
7781 Util::report_form_error ('sync queue process ' , $ item , new \Exception (self ::MSG_ITEM_NO_DATA ), null );
78- $ processor ->update_queue ($ item, true );
82+ $ processor ->remove_from_queue ($ item );
7983 continue ;
8084 }
8185
8286 try {
83- $ processed = $ processor ->process_item ($ item );
84- $ processor ->update_queue ($ item , $ processed );
85- }
86- catch (CrmMaxSyncsException $ e ) {
87- Util::debug_log ("submissionId= {$ item ->get_submission_id ()} msg=Too many CRM syncs per run. Ending this run. " );
87+ if ($ processor ->process_item ($ item )) {
88+ $ processor ->remove_from_queue ($ item );
89+ } else {
90+ $ processor ->update_queue ($ item );
91+ }
92+ } catch (CrmMaxSyncsException $ e ) {
93+ Util::debug_log ("submissionId= " . $ item ->get_submission_id () . " msg=Too many CRM syncs per run. Ending this run. " );
8894 return ;
8995 } catch (\Exception $ e ) {
9096 Util::report_form_error ('sync queue process ' , $ item , $ e , null );
91- $ processor ->update_queue ($ item, false );
97+ $ processor ->update_queue ($ item );
9298 }
9399 }
94100
95101 $ processor ->schedule_next_batch ();
102+ Util::debug_log ("msg=Processed batch of {$ processor ->crm_sync_count } CRM syncs " );
96103 } finally {
97104 $ processor ->release_lock ();
98105 }
@@ -161,13 +168,30 @@ private function restructure_field_data(array $crm_field_data_objects): array {
161168 */
162169 private function too_many_attempts (CrmQueueItem $ item ) {
163170 if ( $ item ->get_attempts () >= self ::MAX_SAVE_ATTEMPTS ) {
164- Util::debug_log ( "submissionId= {$ item ->get_submission_id ()} msg=Too many attempts. Skipping. " );
171+ $ this ->remove_from_queue ($ item );
172+ Util::send_mail_to_admin (
173+ self ::SUBJECT_ITEM_MAX_ATTEMPTS . "id= " . $ item ->get_submission_id (),
174+ self ::MSG_ITEM_MAX_ATTEMPTS . "\n\n" . json_encode ($ item ->get_data ())
175+ );
176+ Util::debug_log ("submissionId= " . $ item ->get_submission_id () . " msg=Too many attempts. Removed from queue. " );
165177 return true ;
166178 }
167179
168- if ( $ item ->last_attempt_seconds_ago () &&
169- $ item ->last_attempt_seconds_ago () < self ::MIN_ATTEMPT_TIMEOUT ) {
170- Util::debug_log ( "submissionId= {$ item ->get_submission_id ()} msg=Last attempt only {$ item ->last_attempt_seconds_ago ()} seconds ago. Skipping. " );
180+ return false ;
181+ }
182+
183+ /**
184+ * Check if the item should be skipped because it was processed too recently
185+ *
186+ * @param CrmQueueItem $item The item to check
187+ * @return bool True if the item should be skipped, false otherwise
188+ */
189+ private function too_recent_attempt (CrmQueueItem $ item ) {
190+ if (
191+ $ item ->last_attempt_seconds_ago () &&
192+ $ item ->last_attempt_seconds_ago () < self ::MIN_ATTEMPT_TIMEOUT
193+ ) {
194+ Util::debug_log ("submissionId= {$ item ->get_submission_id ()} msg=Last attempt only {$ item ->last_attempt_seconds_ago ()} seconds ago. Skipping. " );
171195 return true ;
172196 }
173197
@@ -192,7 +216,7 @@ private function process_item(CrmQueueItem $item): bool {
192216 if ($ this ->can_sync_to_crm ) {
193217 $ match = $ this ->crm_dao ->match ($ crm_field_data_objects );
194218 if ($ this ->is_valid_crm_match ($ match ) || $ new_contacts_to_crm ) {
195- if ($ this ->crm_sync_count >= self ::CRM_MAX_SYNCS_PER_RUN ) {
219+ if ($ this ->crm_sync_count >= self ::CRM_MAX_SYNCS_PER_RUN ) {
196220 throw new CrmMaxSyncsException ();
197221 }
198222 $ this ->crm_sync_count ++;
@@ -237,19 +261,11 @@ private function is_valid_crm_match(array $match): bool {
237261 * Update the queue based on processing result
238262 *
239263 * @param CrmQueueItem $item Queue item that was processed
240- * @param bool $processed Whether the item was successfully processed
241264 */
242- private function update_queue (CrmQueueItem $ item , bool $ remove ): void {
243- if ($ remove ) {
244- $ this ->remove_from_queue ($ item );
245- } else if ($ item ->get_attempts () >= self ::MAX_SAVE_ATTEMPTS ) {
246- $ this ->remove_from_queue ($ item );
247- Util::report_form_error ('item max attempts ' , $ item , new \Exception (self ::MSG_ITEM_MAX_ATTEMPTS ), null );
248- } else {
249- $ item ->add_attempt ();
250- $ this ->queue ->push_if_not_in_queue ($ item );
251- Util::debug_log ("submissionId= {$ item ->get_submission_id ()} msg=Processing failed, will retry. attempts= {$ item ->get_attempts ()}" );
252- }
265+ private function update_queue (CrmQueueItem $ item ): void {
266+ $ item ->add_attempt ();
267+ $ this ->queue ->update_and_move_to_end ($ item );
268+ Util::debug_log ("submissionId= {$ item ->get_submission_id ()} msg=Processing failed, will retry. attempts= {$ item ->get_attempts ()}" );
253269 }
254270
255271 /**
@@ -258,7 +274,7 @@ private function update_queue(CrmQueueItem $item, bool $remove): void {
258274 * @param CrmQueueItem $item The item to remove
259275 */
260276 private function remove_from_queue (CrmQueueItem $ item ): void {
261- $ this ->queue ->filter (function ($ q_item ) use ($ item ) {
277+ $ this ->queue ->filter (function ($ q_item ) use ($ item ) {
262278 return $ q_item ->get_submission_id () !== $ item ->get_submission_id ();
263279 });
264280 }
0 commit comments