Skip to content

Commit 05050b4

Browse files
d-linkoDavid Linko
andauthored
updated to lastest tool and new SQL schema (#249)
* updated to lastest tool and new SQL schema * fixed encoding nonces in URLs * updated to safer literal_eval --------- Co-authored-by: David Linko <david.linko@jhuapl.edu>
1 parent cd32a52 commit 05050b4

File tree

23 files changed

+119
-140
lines changed

23 files changed

+119
-140
lines changed

anms-core/anms/components/schemas/ARIs/registered_agent.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131

3232
# Shared properties
3333
class RegisteredAgentBase(BaseModel):
34-
agent_id_string: Optional[str] = None
34+
agent_endpoint_uri: Optional[str] = None
3535
first_registered: Optional[datetime] = None
3636
last_registered: Optional[datetime] = None
3737

anms-core/anms/components/schemas/ARIs/rpt_entry.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,13 @@
2929

3030
# Shared properties
3131
class RptEntryBase(BaseModel):
32-
reference_time: Optional[str] = None
33-
agent_id: Optional[str] = None
34-
correlator_nonce: Optional[str] = None
32+
class Config:
33+
arbitrary_types_allowed = True
34+
35+
ari_rptset_id: Optional[str] = None
36+
reference_time: Optional[datetime] = None
3537
report_list: Optional[str] = None
36-
ari_rptset_id: Optional[int] = None
38+
agent_id: Optional[int] = None
3739

3840

3941
class RptEntryBaseInDBBase(RptEntryBase):

anms-core/anms/models/relational/execution_set.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
class ExecutionSet(Model):
3636
__tablename__ = 'vw_execution_set'
3737
execution_set_id = Column(Integer, primary_key=True)
38-
correlator_nonce = Column(Integer)
38+
nonce_cbor = Column(LargeBinary)
3939
use_desc = Column(String)
4040
agent_id = Column(String)
4141
num_entries = Column(Integer)
@@ -47,7 +47,7 @@ def __repr__(self) -> str:
4747
def as_dict(self) -> Dict[str, Any]:
4848
dict_obj = {
4949
'execution_set_id': getattr(self, 'execution_set_id'),
50-
'correlator_nonce': getattr(self, 'correlator_nonce'),
50+
'nonce_cbor': getattr(self, 'nonce_cbor'),
5151
'use_desc': getattr(self, 'use_desc'),
5252
'agent_id': getattr(self, 'agent_id'),
5353
'num_entries': getattr(self, 'num_entries'),

anms-core/anms/models/relational/registered_agent.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class RegisteredAgent(Model):
3737
__tablename__ = 'registered_agents'
3838

3939
registered_agents_id = Column(Integer, primary_key=True)
40-
agent_id_string = Column(
40+
agent_endpoint_uri = Column(
4141
String(128),
4242
default='ipn:0.0',
4343
unique=True,

anms-core/anms/models/relational/report.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,18 +35,18 @@
3535
class Report(Model):
3636
__tablename__ = 'ari_rptset'
3737
ari_rptset_id = Column(Integer, primary_key=True)
38-
correlator_nonce = Column(Integer)
38+
nonce_cbor = Column(LargeBinary)
3939
reference_time = Column(Integer)
4040
report_list = Column(String)
4141
report_list_cbor = Column(LargeBinary)
42-
agent_id = Column(String)
42+
agent_id = Column(Integer)
4343
def __repr__(self) -> str:
4444
return self.as_dict().__repr__()
4545

4646
def as_dict(self) -> Dict[str, Any]:
4747
dict_obj = {
4848
'ari_rptset_id': getattr(self, 'ari_rptset_id'),
49-
'correlator_nonce': getattr(self, 'correlator_nonce'),
49+
'nonce_cbor': getattr(self, 'nonce_cbor'),
5050
'reference_time': getattr(self, 'reference_time'),
5151
'report_list': getattr(self, 'report_list'),
5252
'report_list_cbor': getattr(self, 'report_list_cbor'),

anms-core/anms/routes/ARIs/agents.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,9 @@ async def registered_agent_by_id(registered_agents_id: int):
6262
return result.one_or_none()
6363

6464

65-
@router.get("/name/{agent_id_string}", status_code=status.HTTP_200_OK, response_model=ARIs.RegisteredAgent)
66-
async def registered_agent_by_name(agent_id_string: str):
67-
stmt = select(RegisteredAgent).where(RegisteredAgent.agent_id_string == agent_id_string)
65+
@router.get("/name/{agent_endpoint_uri}", status_code=status.HTTP_200_OK, response_model=ARIs.RegisteredAgent)
66+
async def registered_agent_by_name(agent_endpoint_uri: str):
67+
stmt = select(RegisteredAgent).where(RegisteredAgent.agent_endpoint_uri == agent_endpoint_uri)
6868
async with get_async_session() as session:
6969
result: Result = await session.scalars(stmt)
7070
return result.one_or_none()
@@ -75,7 +75,7 @@ async def paged_registered_agents(query: str, params: Params = Depends()):
7575
async with get_async_session() as session:
7676
query = '%' + query + '%'
7777
return await paginate(session, select(RegisteredAgent).where(or_(
78-
RegisteredAgent.agent_id_string.ilike(query),
78+
RegisteredAgent.agent_endpoint_uri.ilike(query),
7979
RegisteredAgent.first_registered.cast(String).ilike(query),
8080
RegisteredAgent.last_registered.cast(String).ilike(query)
8181
)).order_by(RegisteredAgent.registered_agents_id), params)

anms-core/anms/routes/ARIs/reports.py

Lines changed: 55 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -22,30 +22,31 @@
2222
# subcontract 1658085.
2323
#
2424
from typing import List
25-
import re
25+
import ast
26+
2627
from fastapi import APIRouter, Depends
2728
from fastapi import status
2829
from fastapi_pagination import Page, Params
2930
from fastapi_pagination.ext.async_sqlalchemy import paginate
3031
from sqlalchemy import select, and_
3132
from sqlalchemy.engine import Result
32-
from io import StringIO
33+
3334
from urllib.parse import unquote
3435

3536
from anms.components.schemas import ARIs
3637
from anms.models.relational import get_async_session, get_session
3738

3839
from anms.models.relational.report import Report
3940
from anms.models.relational.execution_set import ExecutionSet
41+
from anms.models.relational.registered_agent import RegisteredAgent
42+
4043
from anms.shared.opensearch_logger import OpenSearchLogger
4144
import io
4245

4346
import anms.routes.transcoder as transcoder
4447

4548
# for handling report set and exec set
4649
import ace
47-
import ace.models
48-
4950

5051
logger = OpenSearchLogger(__name__, log_console=True)
5152

@@ -69,47 +70,70 @@ async def all_report_name():
6970

7071
@router.get("/entry/name/{agent_id}", status_code=status.HTTP_200_OK, response_model=list,
7172
tags=["REPORTS"])
72-
async def report_def_by_id(agent_id: str):
73+
async def report_def_by_id(agent_id: int):
7374
# select all reports belonging to the agent
74-
agent_id = agent_id.strip()
7575
final_res = []
76+
agent_id_str = ""
7677
stmt = select(Report).where(Report.agent_id == agent_id)
78+
agent_id_stmt = select(RegisteredAgent).where(RegisteredAgent.registered_agents_id == agent_id)
7779
async with get_async_session() as session:
7880
result: Result = await session.scalars(stmt)
81+
# Execution set uses URI as agent_id
82+
result_agent: Result = await session.scalars(agent_id_stmt)
83+
agent_id_str = result_agent.one_or_none()
84+
agent_id_str = agent_id_str.agent_endpoint_uri
7985
for res in result.all():
8086
# select from exec_set
81-
correlator_nonce = res.correlator_nonce
82-
stmt = select(ExecutionSet).where(and_(ExecutionSet.agent_id == agent_id, ExecutionSet.correlator_nonce == correlator_nonce) )
83-
result: Result = await session.scalars(stmt)
84-
exc_set = result.all()
85-
for res in exc_set:
86-
ari_val = ""
87-
if(res):
88-
ari_val = await transcoder.transcoder_put_cbor_await("0x"+res.entries.hex())
89-
ari_val = ari_val['data']
90-
addition = {'exec_set': ari_val,'correlator_nonce':correlator_nonce}
91-
if addition not in final_res:
92-
final_res.append(addition)
93-
87+
try:
88+
nonce_cbor = res.nonce_cbor
89+
stmt = select(ExecutionSet).where(and_(ExecutionSet.agent_id == agent_id_str, ExecutionSet.nonce_cbor == nonce_cbor) )
90+
result: Result = await session.scalars(stmt)
91+
exc_set = result.all()
92+
for res in exc_set:
93+
ari_val = ""
94+
if(res):
95+
hex_str = res.entries.hex()
96+
hex_str = "0x"+hex_str.upper()
97+
ari_val = await transcoder.transcoder_put_cbor_await(hex_str)
98+
ari_val = ari_val['data']
99+
logger.info(str(nonce_cbor))
100+
addition = {'exec_set': ari_val,'nonce_cbor':str(nonce_cbor)}
101+
if addition not in final_res:
102+
final_res.append(addition)
103+
except Exception as e:
104+
logger.error(f"Error {e}, while processing nonce:{nonce_cbor} for agent: {agent_id_str}")
105+
94106
return final_res
95107

96108

97109
# entries tabulated returns header and values in correct order
98-
@router.get("/entries/table/{agent_id}/{correlator_nonce}", status_code=status.HTTP_200_OK,
110+
@router.get("/entries/table/{agent_id}/{nonce_cbor}", status_code=status.HTTP_200_OK,
99111
response_model=list)
100-
async def report_ac(agent_id: str, correlator_nonce: int):
101-
agent_id = agent_id.strip()
102-
final_res = []
112+
async def report_ac(agent_id: int, nonce_cbor: str):
113+
103114
ari = None
104115
dec = ace.ari_cbor.Decoder()
105-
buf = io.StringIO()
116+
enc = ace.ari_text.Encoder()
117+
try:
118+
nonce_cbor = ast.literal_eval(nonce_cbor)
119+
except Exception as e:
120+
logger.error(f"{e} while processing nonce")
121+
return []
122+
123+
agent_id_str =""
124+
agent_id_stmt = select(RegisteredAgent).where(RegisteredAgent.registered_agents_id == agent_id)
125+
async with get_async_session() as session:
126+
result_agent: Result = await session.scalars(agent_id_stmt)
127+
agent_id_str = result_agent.one_or_none()
128+
agent_id_str = agent_id_str.agent_endpoint_uri
129+
106130
# Load in adms
107131
# get command that made the report as first entry
108-
stmt = select(ExecutionSet).where(and_(ExecutionSet.agent_id == agent_id, ExecutionSet.correlator_nonce == correlator_nonce) )
132+
stmt = select(ExecutionSet).where(and_(ExecutionSet.agent_id == agent_id_str, ExecutionSet.nonce_cbor == nonce_cbor) )
109133
async with get_async_session() as session:
110134
result: Result = await session.scalars(stmt)
111135

112-
# there should only be one execution per agent per correlator_nonce
136+
# there should only be one execution per agent per nonce_cbor
113137
# in the event that two occur pull the latest one
114138
result = result.all()
115139
exec_set_dir = {}
@@ -124,13 +148,13 @@ async def report_ac(agent_id: str, correlator_nonce: int):
124148
ari = dec.decode(io.BytesIO(in_bytes))
125149

126150
except Exception as err:
127-
logger.info(err)
151+
logger.error(err)
128152

129153
# current ARI should be an exection set
130154
if ari:
131155
if type(ari.value) == ace.ari.ExecutionSet:
132156
try:
133-
enc = ace.ari_text.Encoder()
157+
134158
# run through targets and their parameters to get all things parts translated
135159
for targ in ari.value.targets:
136160
buf = io.StringIO()
@@ -149,12 +173,12 @@ async def report_ac(agent_id: str, correlator_nonce: int):
149173
exec_set_dir[out_text_targ] = [exec_set_entry]
150174

151175
except Exception as err:
152-
logger.info(err)
176+
logger.error(err)
153177

154178

155179
# final_res.append(exec_set_entry)
156180
ari = None
157-
stmt = select(Report).where(and_(Report.agent_id == agent_id, Report.correlator_nonce == correlator_nonce) )
181+
stmt = select(Report).where(and_(Report.agent_id == agent_id, Report.nonce_cbor == nonce_cbor) )
158182
async with get_async_session() as session:
159183
result: Result = await session.scalars(stmt)
160184
for res in result.all():
@@ -188,9 +212,7 @@ async def report_ac(agent_id: str, correlator_nonce: int):
188212

189213
exec_set_dir[out_text].append(addition)
190214
except Exception as err:
191-
logger.error(err)
192-
193-
215+
logger.error(err)
194216

195217
return list(exec_set_dir.values())
196218

anms-core/anms/routes/adms/adm.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ async def handle_adm(admset: ace.AdmSet, adm_file: ace.models.AdmModule, session
156156
# Use CAmPython to generate sql
157157
out_path = "" # This is empty string since we don't need to write the generated sql to a file
158158
sql_dialect = 'pgsql'
159-
writer = create_sql.Writer(admset, adm_file, out_path, False, dialect=sql_dialect)
159+
writer = create_sql.Writer(admset, adm_file, out_path, sql_dialect)
160160
string_buffer = io.StringIO()
161161
writer.write(string_buffer)
162162

@@ -252,7 +252,7 @@ async def update_adm(file: UploadFile, request: Request):
252252
logger.info(f"{info_message} adm: {adm_file.norm_name}")
253253
out_path = "" # This is empty string since we don't need to write the generated sql to a file
254254
sql_dialect = 'pgsql'
255-
writer = create_sql.Writer(admset, adm_file, out_path, dialect=sql_dialect)
255+
writer = create_sql.Writer(admset, adm_file, out_path, sql_dialect)
256256
string_buffer = io.StringIO()
257257
try: # catching error in sql creation
258258
writer.write(string_buffer)

anms-core/anms/routes/agent_parameter.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -99,12 +99,12 @@ def send_parameter(agent_id: int, agent_parameter_id: int, command_parameters: d
9999
try:
100100
session.add(in_stm)
101101
session.commit()
102-
# getting agent_id_string
103-
agent_id_string = session.execute(select(RegisteredAgent.agent_id_string).where(
102+
# getting agent_endpoint_uri
103+
agent_endpoint_uri = session.execute(select(RegisteredAgent.agent_endpoint_uri).where(
104104
RegisteredAgent.registered_agents_id == agent_id))
105-
agent_id_string = agent_id_string.one_or_none()[0]
105+
agent_endpoint_uri = agent_endpoint_uri.one_or_none()[0]
106106
err = process_command(agent_parameter_id, command_parameters,
107-
AGENT_PARAMETER.get_agent(),agent_id_string)
107+
AGENT_PARAMETER.get_agent(),agent_endpoint_uri)
108108
if not err:
109109
return err
110110
except IntegrityError:

0 commit comments

Comments
 (0)