Skip to content

Commit 14eda5f

Browse files
author
David Linko
committed
working transcoder fixed issue with threading and DB
1 parent 8a49c42 commit 14eda5f

File tree

5 files changed

+97
-73
lines changed

5 files changed

+97
-73
lines changed

anms-core/anms/init_adms.py

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
from anms.routes.adms.adm import handle_adm
3232
from anms.shared.config_utils import ConfigBuilder
3333
from anms.shared.opensearch_logger import OpenSearchLogger
34+
from anms.shared.transmogrifier import TRANSMORGIFIER
3435

3536

3637
logger = OpenSearchLogger(__name__).logger
@@ -67,17 +68,10 @@ async def import_adms():
6768
logger.error('ADM %s handling failed: %s', adm_file.norm_name, err)
6869
logger.debug('%s', traceback.format_exc())
6970

71+
7072
# Notify the aricodec of startup
71-
config = ConfigBuilder.get_config()
72-
host = config.get('MQTT_HOST')
73-
port = config.get('MQTT_PORT')
74-
75-
logger.info('Connecting to MQTT broker %s to notify aricodec', host)
76-
client = mqtt.client.Client()
77-
client.connect(host, port)
78-
msg = client.publish('aricodec/reload', b'')
79-
msg.wait_for_publish()
80-
client.disconnect()
73+
TRANSMORGIFIER.reload()
74+
8175

8276
logger.info('Startup finished')
8377

anms-core/anms/routes/adms/adm.py

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,20 +29,17 @@
2929
from pydantic import BaseModel
3030
import io
3131
import traceback
32-
from typing import TextIO
3332

3433
# Internal modules
35-
from sqlalchemy import delete, select, and_
36-
from sqlalchemy.engine import Result
37-
from sqlalchemy.ext.asyncio import AsyncSession
34+
from sqlalchemy import delete, and_
3835

3936
from anms.models.relational.adms import (adm_data, data_model_view)
40-
from anms.models.relational.adms.data_model_view import DataModel as ADM
4137

4238
from anms.routes.adms.adm_compare import (AdmCompare)
4339
from anms.shared.opensearch_logger import OpenSearchLogger
44-
from anms.shared.mqtt_client import MQTT_CLIENT
45-
from anms.models.relational import get_async_session, get_session
40+
41+
from anms.shared.transmogrifier import TRANSMORGIFIER
42+
from anms.models.relational import get_async_session
4643
from anms.components.schemas.adm import DataModelSchema
4744
import ace
4845
from camp.generators import (create_sql)
@@ -296,7 +293,7 @@ async def update_adm(file: UploadFile, request: Request):
296293
if error_message:
297294
raise Exception(error_message)
298295
# Notify the transcoder
299-
MQTT_CLIENT.publish('aricodec/reload', adm_file.norm_name)
296+
TRANSMORGIFIER.reload(adm_file.norm_name)
300297
logger.info(f"{info_message} adm file: {file.filename} successfully")
301298
except Exception as err:
302299
logger.error(f"{sql_dialect} execution error: {err.args}")

anms-core/anms/routes/transcoder.py

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -96,22 +96,27 @@ async def transcoder_put_cbor_await(cbor: str):
9696

9797
def _transcoder_put_cbor(input_cbor):
9898
transcoder_log_id = None
99+
send_to_transcode = False
99100
with get_session() as session:
100101
curr_uri = TranscoderLog.query.filter(or_(TranscoderLog.input_string==input_cbor, TranscoderLog.cbor==input_cbor)).first()
102+
101103
if curr_uri is None:
102104
c1 = TranscoderLog(input_string=input_cbor, parsed_as='pending')
103105
session.add(c1)
104106
session.flush()
105107
session.refresh(c1)
106108
transcoder_log_id = c1.transcoder_log_id
107109
session.commit()
108-
status = "Submitted ARI to transcoder"
109-
TRANSMORGIFIER.transcode(input_cbor)
110+
send_to_transcode = True
110111
else:
111112
# the input_ari has already been submitted
112113
status = "ARI previously submitted, check log"
113114
transcoder_log_id = curr_uri.transcoder_log_id
114-
115+
116+
if(send_to_transcode):
117+
status = "Submitted ARI to transcoder"
118+
TRANSMORGIFIER.transcode(input_cbor)
119+
115120
return {"id": transcoder_log_id, "status": status}
116121

117122

@@ -142,6 +147,7 @@ def transcoder_incoming_str(input_ari: str):
142147
def _transcoder_put_str(input_ari: str):
143148
input_ari = input_ari.strip()
144149
transcoder_log_id = None
150+
send_to_transcode = False
145151
with get_session() as session:
146152
curr_uri = TranscoderLog.query.filter(or_(TranscoderLog.input_string==input_ari,TranscoderLog.ari==input_ari, TranscoderLog.cbor==input_ari)).first()
147153
if curr_uri is None:
@@ -151,17 +157,19 @@ def _transcoder_put_str(input_ari: str):
151157
session.refresh(c1)
152158
transcoder_log_id = c1.transcoder_log_id
153159
session.commit()
154-
status = "Submitted ARI to transcoder"
155-
TRANSMORGIFIER.transcode(input_ari)
160+
send_to_transcode = True
156161
else:
157162
# the input_ari has already been submitted
158163
status = "ARI previously submitted, check log"
159164
transcoder_log_id = curr_uri.transcoder_log_id
165+
166+
if(send_to_transcode):
167+
status = "Submitted ARI to transcoder"
168+
TRANSMORGIFIER.transcode(input_ari)
160169

161170
return {"id": transcoder_log_id, "status": status}
162171

163172

164-
165173
# PUT /ui/incoming_send/str Body is str ARI to send to transcoder
166174
@router.put("/ui/incoming_send/str", status_code=status.HTTP_200_OK)
167175
async def transcoder_send_ari_str(eid: str, ari: str):

anms-core/anms/shared/mqtt_client.py

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -41,16 +41,17 @@ def __init__(self, config):
4141
return
4242

4343
# Create MQTT Client
44-
client = mqtt.client.Client("Core_MQTT_Client", clean_session=False)
45-
client.on_connect = self._on_connect
46-
client.on_message = self._on_message
47-
logger.info('Connecting to MQTT broker at %s:%s', host, port)
48-
client.connect_async(host, port, keepalive=60)
49-
self.client = client
50-
checking_child = Thread(target=self._check_pending)
51-
checking_child.daemon = True
52-
checking_child.start()
53-
client.loop_start()
44+
if config.Transcoder != "Internal":
45+
client = mqtt.client.Client("Core_MQTT_Client", clean_session=False)
46+
client.on_connect = self._on_connect
47+
client.on_message = self._on_message
48+
logger.info('Connecting to MQTT broker at %s:%s', host, port)
49+
client.connect_async(host, port, keepalive=60)
50+
self.client = client
51+
checking_child = Thread(target=self._check_pending)
52+
checking_child.daemon = True
53+
checking_child.start()
54+
client.loop_start()
5455

5556
def publish(self, *args, **kwargs):
5657
''' If connected, pass through a publish request. '''

anms-core/anms/shared/transmogrifier.py

Lines changed: 63 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424

2525
from anms.shared.config import ConfigBuilder
2626
from anms.shared.logger import Logger
27-
from anms.shared.mqtt_client import MQTT_CLIENT
27+
import anms.shared.mqtt_client
2828
from anms.shared.opensearch_logger import OpenSearchLogger
2929
from anms.models.relational import get_session
3030
from anms.models.relational.transcoder_log import TranscoderLog
@@ -36,44 +36,57 @@
3636
import sqlalchemy
3737

3838
config = ConfigBuilder.get_config()
39-
LOGGER = OpenSearchLogger(__name__, log_console=True)
39+
LOGGER = OpenSearchLogger(__name__, log_console=True).logger
4040

4141

4242
# depending on what the config is for core will either use a MQTT server to send off commands or
4343
# use an internal
4444
class Transmorgifier:
4545

4646
''' The Transmogifier that can be configured to use an external or internal translator. '''
47+
# args = config
4748
def __init__(self, args):
4849
# if the transcoding in internal to core
49-
if config.Transcoder == "internal":
50-
LOGGER.info('Connecting to SQL DB at %s', args.db_uri)
51-
self._dbeng = sqlalchemy.create_engine(args.db_uri)
52-
self._adms = ace.AdmSet(cache_dir=False)
50+
LOGGER.info(config.Transcoder)
51+
if config.Transcoder == "Internal":
52+
db_uri = f"postgresql://{config.DB_USER}:{config.DB_PASS}@{config.DB_HOST}/{config.DB_CHROOT}"
53+
LOGGER.info(f'Connecting to SQL DB at {db_uri}')
54+
self._dbeng = sqlalchemy.create_engine(db_uri)
55+
5356
self.transcode = self._transcode_internal
5457
self.reload = self._reload_internal
5558
self._adm_reload(None)
5659
else:
5760
# setting up the MQTT server instead
61+
self.MQTT_CLIENT = anms.shared.mqtt_client.MQTT_CLIENT
5862
self.transcode = self._transcode_mqtt
5963
self.reload = self._reload_mqtt
6064

6165
def _transcode_mqtt(self, input):
6266
msg = json.dumps({'uri': input})
63-
LOGGER.info('PUBLISH to transcode/CoreFacing/Outgoing, msg = %s' % msg)
64-
MQTT_CLIENT.publish("transcode/CoreFacing/Outgoing", msg)
67+
LOGGER.info(f'PUBLISH to transcode/CoreFacing/Outgoing, msg = {msg}')
68+
self.MQTT_CLIENT.publish("transcode/CoreFacing/Outgoing", msg)
6569

6670
def _transcode_internal(self, input):
71+
self._ace_transcode(input)
72+
73+
# picking up any stray items that didnt get translated
74+
pending_uris = TranscoderLog.query.filter_by(parsed_as='pending').all()
75+
for entrys in pending_uris:
76+
try:
77+
self._ace_transcode(entrys.input_string)
78+
except Exception as err:
79+
LOGGER.error('Failed to process pending entry: %s', err)
80+
81+
def _ace_transcode(self, input):
6782
# result object to fill in
6883
res_obj = {}
6984
res_obj['uri'] = ""
7085
res_obj['cbor'] = ""
71-
86+
adms = ace.AdmSet()
7287
try:
73-
req_obj = input
74-
LOGGER.info('Request %s', req_obj)
75-
76-
in_text = req_obj['uri'].strip()
88+
LOGGER.info(f'Request {input}')
89+
in_text = input.strip()
7790
res_obj['inputString'] = in_text
7891
in_lower = in_text.casefold()
7992
if in_lower.startswith('ari:0x') or in_lower.startswith('0x'):
@@ -87,9 +100,9 @@ def _transcode_internal(self, input):
87100
in_bytes = ace.cborutil.from_hexstr(in_text)
88101
dec = ace.ari_cbor.Decoder()
89102
ari = dec.decode(io.BytesIO(in_bytes))
90-
LOGGER.debug('decoded as ARI %s', ari)
91-
# ace.nickname.Converter(ace.nickname.Mode.FROM_NN, self._admsSession(self._dbeng), True)(ari)
92-
ari = ace.nickname.Converter(ace.nickname.Mode.FROM_NN, self._adms.db_session(), False)(ari)
103+
LOGGER.debug(f'decoded as ARI {ari}')
104+
# ace.nickname.Converter(ace.nickname.Mode.FROM_NN, admsSession(self._dbeng), True)(ari)
105+
ari = ace.nickname.Converter(ace.nickname.Mode.FROM_NN, adms.db_session(), False)(ari)
93106

94107
except Exception as err:
95108
raise RuntimeError(f"Error decoding from `{in_text}`: {err}") from err
@@ -104,7 +117,7 @@ def _transcode_internal(self, input):
104117
out_text = buf.getvalue()
105118
if not out_text.startswith('ari:'):
106119
out_text = 'ari:' + out_text
107-
LOGGER.debug('encoded as text %s', out_text)
120+
LOGGER.debug(f'encoded as text {out_text}')
108121
except Exception as err:
109122
raise RuntimeError(f"Error encoding from {ari}: {err}") from err
110123
res_obj['uri'] = out_text
@@ -116,8 +129,8 @@ def _transcode_internal(self, input):
116129
try:
117130
dec = ace.ari_text.Decoder()
118131
ari = dec.decode(io.StringIO(in_text))
119-
LOGGER.debug('decoded as ARI %s', ari)
120-
ari = ace.nickname.Converter(ace.nickname.Mode.FROM_NN, self._adms.db_session(), False)(ari)
132+
LOGGER.debug(f'decoded as ARI {ari}')
133+
ari = ace.nickname.Converter(ace.nickname.Mode.FROM_NN, adms.db_session(), False)(ari)
121134
except Exception as err:
122135
raise RuntimeError(f"Error decoding from `{in_text}`: {err}") from err
123136

@@ -130,7 +143,7 @@ def _transcode_internal(self, input):
130143
out_text = buf.getvalue()
131144
if not out_text.startswith('ari:'):
132145
out_text = 'ari:' + out_text
133-
LOGGER.debug('encoded as text %s', out_text)
146+
LOGGER.debug(f'encoded as text {out_text}')
134147
except Exception as err:
135148
raise RuntimeError(f"Error encoding from {ari}: {err}") from err
136149

@@ -143,37 +156,47 @@ def _transcode_internal(self, input):
143156
enc.encode(ari, buf)
144157

145158
hex_str = ace.cborutil.to_hexstr(buf.getvalue())
146-
LOGGER.info('encoded as binary %s', hex_str)
159+
LOGGER.info(f'encoded as binary {hex_str}')
147160
except Exception as err:
148161
raise RuntimeError(f"Error encoding from {ari}: {err}") from err
149162
res_obj['cbor'] = hex_str
150-
151163
except Exception as err:
152164
res_obj['ari'] = f'Failed to process: {err}'
153165
res_obj['parsedAs'] = 'ERROR'
154-
LOGGER.error('Failed to process: %s', err)
155-
LOGGER.info('Traceback:\n%s', traceback.format_exc())
156-
157-
LOGGER.info('Response %s', res_obj)
158-
159-
# client.publish('transcode/CodexFacing/Outgoing', json.dumps(res_obj))
160-
# just log it back into the database
161-
166+
LOGGER.error(f'Failed to process: {err}')
167+
LOGGER.info(f'Traceback:\n{traceback.format_exc()}')
168+
162169
# store in transcoder database
163170
with get_session() as session:
164-
session.query(TranscoderLog).filter(TranscoderLog.input_string == res_obj['inputString']). \
165-
update({
171+
session.query(TranscoderLog).filter(TranscoderLog.input_string == input).update({
166172
'parsed_as': res_obj['parsedAs'],
167173
'ari': json.dumps(res_obj['ari']),
168174
'cbor': res_obj['cbor'],
169175
'uri': res_obj['uri']
170176
})
171-
session.commit()
177+
session.commit()
178+
LOGGER.info(f'Response {res_obj}')
179+
180+
# client.publish('transcode/CodexFacing/Outgoing', json.dumps(res_obj))
181+
# just log it back into the database
182+
183+
184+
185+
def _reload_mqtt(self,adm_name=None):
186+
config = ConfigBuilder.get_config()
187+
host = config.get('MQTT_HOST')
188+
port = config.get('MQTT_PORT')
172189

173-
def _reload_mqtt(self,adm_name):
190+
LOGGER.info('Connecting to MQTT broker %s to notify aricodec' % host)
191+
192+
msg = self.MQTT_CLIENT.publish('aricodec/reload', b'')
193+
if adm_name:
194+
msg = self.MQTT_CLIENT.publish('aricodec/reload', f'{adm_name}')
195+
msg.wait_for_publish()
196+
174197
return
175198

176-
def _reload_internal(self, adm_name):
199+
def _reload_internal(self, adm_name=None):
177200
try:
178201
self._adm_reload(adm_name)
179202
except Exception as err:
@@ -204,17 +227,18 @@ def _adm_reload(self, adm_name):
204227
for row in curs.all():
205228
self._handle_adm(*row)
206229

207-
LOGGER.info('ADMS present for: %s', self._adms.names())
230+
208231

209232
def _handle_adm(self, adm_name, timestamp, data):
210-
LOGGER.info('Handling ADM: %s', adm_name)
233+
LOGGER.info(f'Handling ADM:{adm_name}')
211234
LOGGER.info(type(data))
212235
# LOGGER.info(data.tos())
213236

214237
io_buffer = io.StringIO(data.tobytes().decode('utf-8'))
215-
216-
self._adms.load_from_data(io_buffer)
238+
adms = ace.AdmSet()
239+
adms.load_from_data(io_buffer)
217240
LOGGER.info('Handling finished')
241+
LOGGER.info('ADMS present for: %s', adms.names())
218242

219243

220244
# SIGNALTON transmorgifier

0 commit comments

Comments
 (0)