Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

# Shared properties
class RegisteredAgentBase(BaseModel):
agent_id_string: Optional[str] = None
agent_endpoint_uri: Optional[str] = None
first_registered: Optional[datetime] = None
last_registered: Optional[datetime] = None

Expand Down
10 changes: 6 additions & 4 deletions anms-core/anms/components/schemas/ARIs/rpt_entry.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@

# Shared properties
class RptEntryBase(BaseModel):
reference_time: Optional[str] = None
agent_id: Optional[str] = None
correlator_nonce: Optional[str] = None
class Config:
arbitrary_types_allowed = True

ari_rptset_id: Optional[str] = None
reference_time: Optional[datetime] = None
report_list: Optional[str] = None
ari_rptset_id: Optional[int] = None
agent_id: Optional[int] = None


class RptEntryBaseInDBBase(RptEntryBase):
Expand Down
4 changes: 2 additions & 2 deletions anms-core/anms/models/relational/execution_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
class ExecutionSet(Model):
__tablename__ = 'vw_execution_set'
execution_set_id = Column(Integer, primary_key=True)
correlator_nonce = Column(Integer)
nonce_cbor = Column(LargeBinary)
use_desc = Column(String)
agent_id = Column(String)
num_entries = Column(Integer)
Expand All @@ -47,7 +47,7 @@ def __repr__(self) -> str:
def as_dict(self) -> Dict[str, Any]:
dict_obj = {
'execution_set_id': getattr(self, 'execution_set_id'),
'correlator_nonce': getattr(self, 'correlator_nonce'),
'nonce_cbor': getattr(self, 'nonce_cbor'),
'use_desc': getattr(self, 'use_desc'),
'agent_id': getattr(self, 'agent_id'),
'num_entries': getattr(self, 'num_entries'),
Expand Down
2 changes: 1 addition & 1 deletion anms-core/anms/models/relational/registered_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class RegisteredAgent(Model):
__tablename__ = 'registered_agents'

registered_agents_id = Column(Integer, primary_key=True)
agent_id_string = Column(
agent_endpoint_uri = Column(
String(128),
default='ipn:0.0',
unique=True,
Expand Down
6 changes: 3 additions & 3 deletions anms-core/anms/models/relational/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,18 @@
class Report(Model):
__tablename__ = 'ari_rptset'
ari_rptset_id = Column(Integer, primary_key=True)
correlator_nonce = Column(Integer)
nonce_cbor = Column(LargeBinary)
reference_time = Column(Integer)
report_list = Column(String)
report_list_cbor = Column(LargeBinary)
agent_id = Column(String)
agent_id = Column(Integer)
def __repr__(self) -> str:
return self.as_dict().__repr__()

def as_dict(self) -> Dict[str, Any]:
dict_obj = {
'ari_rptset_id': getattr(self, 'ari_rptset_id'),
'correlator_nonce': getattr(self, 'correlator_nonce'),
'nonce_cbor': getattr(self, 'nonce_cbor'),
'reference_time': getattr(self, 'reference_time'),
'report_list': getattr(self, 'report_list'),
'report_list_cbor': getattr(self, 'report_list_cbor'),
Expand Down
8 changes: 4 additions & 4 deletions anms-core/anms/routes/ARIs/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ async def registered_agent_by_id(registered_agents_id: int):
return result.one_or_none()


@router.get("/name/{agent_id_string}", status_code=status.HTTP_200_OK, response_model=ARIs.RegisteredAgent)
async def registered_agent_by_name(agent_id_string: str):
stmt = select(RegisteredAgent).where(RegisteredAgent.agent_id_string == agent_id_string)
@router.get("/name/{agent_endpoint_uri}", status_code=status.HTTP_200_OK, response_model=ARIs.RegisteredAgent)
async def registered_agent_by_name(agent_endpoint_uri: str):
stmt = select(RegisteredAgent).where(RegisteredAgent.agent_endpoint_uri == agent_endpoint_uri)
async with get_async_session() as session:
result: Result = await session.scalars(stmt)
return result.one_or_none()
Expand All @@ -75,7 +75,7 @@ async def paged_registered_agents(query: str, params: Params = Depends()):
async with get_async_session() as session:
query = '%' + query + '%'
return await paginate(session, select(RegisteredAgent).where(or_(
RegisteredAgent.agent_id_string.ilike(query),
RegisteredAgent.agent_endpoint_uri.ilike(query),
RegisteredAgent.first_registered.cast(String).ilike(query),
RegisteredAgent.last_registered.cast(String).ilike(query)
)).order_by(RegisteredAgent.registered_agents_id), params)
88 changes: 55 additions & 33 deletions anms-core/anms/routes/ARIs/reports.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,31 @@
# subcontract 1658085.
#
from typing import List
import re
import ast

from fastapi import APIRouter, Depends
from fastapi import status
from fastapi_pagination import Page, Params
from fastapi_pagination.ext.async_sqlalchemy import paginate
from sqlalchemy import select, and_
from sqlalchemy.engine import Result
from io import StringIO

from urllib.parse import unquote

from anms.components.schemas import ARIs
from anms.models.relational import get_async_session, get_session

from anms.models.relational.report import Report
from anms.models.relational.execution_set import ExecutionSet
from anms.models.relational.registered_agent import RegisteredAgent

from anms.shared.opensearch_logger import OpenSearchLogger
import io

import anms.routes.transcoder as transcoder

# for handling report set and exec set
import ace
import ace.models


logger = OpenSearchLogger(__name__, log_console=True)

Expand All @@ -69,47 +70,70 @@ async def all_report_name():

@router.get("/entry/name/{agent_id}", status_code=status.HTTP_200_OK, response_model=list,
tags=["REPORTS"])
async def report_def_by_id(agent_id: str):
async def report_def_by_id(agent_id: int):
# select all reports belonging to the agent
agent_id = agent_id.strip()
final_res = []
agent_id_str = ""
stmt = select(Report).where(Report.agent_id == agent_id)
agent_id_stmt = select(RegisteredAgent).where(RegisteredAgent.registered_agents_id == agent_id)
async with get_async_session() as session:
result: Result = await session.scalars(stmt)
# Execution set uses URI as agent_id
result_agent: Result = await session.scalars(agent_id_stmt)
agent_id_str = result_agent.one_or_none()
agent_id_str = agent_id_str.agent_endpoint_uri
for res in result.all():
# select from exec_set
correlator_nonce = res.correlator_nonce
stmt = select(ExecutionSet).where(and_(ExecutionSet.agent_id == agent_id, ExecutionSet.correlator_nonce == correlator_nonce) )
result: Result = await session.scalars(stmt)
exc_set = result.all()
for res in exc_set:
ari_val = ""
if(res):
ari_val = await transcoder.transcoder_put_cbor_await("0x"+res.entries.hex())
ari_val = ari_val['data']
addition = {'exec_set': ari_val,'correlator_nonce':correlator_nonce}
if addition not in final_res:
final_res.append(addition)

try:
nonce_cbor = res.nonce_cbor
stmt = select(ExecutionSet).where(and_(ExecutionSet.agent_id == agent_id_str, ExecutionSet.nonce_cbor == nonce_cbor) )
result: Result = await session.scalars(stmt)
exc_set = result.all()
for res in exc_set:
ari_val = ""
if(res):
hex_str = res.entries.hex()
hex_str = "0x"+hex_str.upper()
ari_val = await transcoder.transcoder_put_cbor_await(hex_str)
ari_val = ari_val['data']
logger.info(str(nonce_cbor))
addition = {'exec_set': ari_val,'nonce_cbor':str(nonce_cbor)}
if addition not in final_res:
final_res.append(addition)
except Exception as e:
logger.error(f"Error {e}, while processing nonce:{nonce_cbor} for agent: {agent_id_str}")

return final_res


# entries tabulated returns header and values in correct order
@router.get("/entries/table/{agent_id}/{correlator_nonce}", status_code=status.HTTP_200_OK,
@router.get("/entries/table/{agent_id}/{nonce_cbor}", status_code=status.HTTP_200_OK,
response_model=list)
async def report_ac(agent_id: str, correlator_nonce: int):
agent_id = agent_id.strip()
final_res = []
async def report_ac(agent_id: int, nonce_cbor: str):

ari = None
dec = ace.ari_cbor.Decoder()
buf = io.StringIO()
enc = ace.ari_text.Encoder()
try:
nonce_cbor = ast.literal_eval(nonce_cbor)
except Exception as e:
logger.error(f"{e} while processing nonce")
return []

agent_id_str =""
agent_id_stmt = select(RegisteredAgent).where(RegisteredAgent.registered_agents_id == agent_id)
async with get_async_session() as session:
result_agent: Result = await session.scalars(agent_id_stmt)
agent_id_str = result_agent.one_or_none()
agent_id_str = agent_id_str.agent_endpoint_uri

# Load in adms
# get command that made the report as first entry
stmt = select(ExecutionSet).where(and_(ExecutionSet.agent_id == agent_id, ExecutionSet.correlator_nonce == correlator_nonce) )
stmt = select(ExecutionSet).where(and_(ExecutionSet.agent_id == agent_id_str, ExecutionSet.nonce_cbor == nonce_cbor) )
async with get_async_session() as session:
result: Result = await session.scalars(stmt)

# there should only be one execution per agent per correlator_nonce
# there should only be one execution per agent per nonce_cbor
# in the event that two occur pull the latest one
result = result.all()
exec_set_dir = {}
Expand All @@ -124,13 +148,13 @@ async def report_ac(agent_id: str, correlator_nonce: int):
ari = dec.decode(io.BytesIO(in_bytes))

except Exception as err:
logger.info(err)
logger.error(err)

# current ARI should be an exection set
if ari:
if type(ari.value) == ace.ari.ExecutionSet:
try:
enc = ace.ari_text.Encoder()

# run through targets and their parameters to get all things parts translated
for targ in ari.value.targets:
buf = io.StringIO()
Expand All @@ -149,12 +173,12 @@ async def report_ac(agent_id: str, correlator_nonce: int):
exec_set_dir[out_text_targ] = [exec_set_entry]

except Exception as err:
logger.info(err)
logger.error(err)


# final_res.append(exec_set_entry)
ari = None
stmt = select(Report).where(and_(Report.agent_id == agent_id, Report.correlator_nonce == correlator_nonce) )
stmt = select(Report).where(and_(Report.agent_id == agent_id, Report.nonce_cbor == nonce_cbor) )
async with get_async_session() as session:
result: Result = await session.scalars(stmt)
for res in result.all():
Expand Down Expand Up @@ -188,9 +212,7 @@ async def report_ac(agent_id: str, correlator_nonce: int):

exec_set_dir[out_text].append(addition)
except Exception as err:
logger.error(err)


logger.error(err)

return list(exec_set_dir.values())

4 changes: 2 additions & 2 deletions anms-core/anms/routes/adms/adm.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ async def handle_adm(admset: ace.AdmSet, adm_file: ace.models.AdmModule, session
# Use CAmPython to generate sql
out_path = "" # This is empty string since we don't need to write the generated sql to a file
sql_dialect = 'pgsql'
writer = create_sql.Writer(admset, adm_file, out_path, False, dialect=sql_dialect)
writer = create_sql.Writer(admset, adm_file, out_path, sql_dialect)
string_buffer = io.StringIO()
writer.write(string_buffer)

Expand Down Expand Up @@ -252,7 +252,7 @@ async def update_adm(file: UploadFile, request: Request):
logger.info(f"{info_message} adm: {adm_file.norm_name}")
out_path = "" # This is empty string since we don't need to write the generated sql to a file
sql_dialect = 'pgsql'
writer = create_sql.Writer(admset, adm_file, out_path, dialect=sql_dialect)
writer = create_sql.Writer(admset, adm_file, out_path, sql_dialect)
string_buffer = io.StringIO()
try: # catching error in sql creation
writer.write(string_buffer)
Expand Down
8 changes: 4 additions & 4 deletions anms-core/anms/routes/agent_parameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,12 @@ def send_parameter(agent_id: int, agent_parameter_id: int, command_parameters: d
try:
session.add(in_stm)
session.commit()
# getting agent_id_string
agent_id_string = session.execute(select(RegisteredAgent.agent_id_string).where(
# getting agent_endpoint_uri
agent_endpoint_uri = session.execute(select(RegisteredAgent.agent_endpoint_uri).where(
RegisteredAgent.registered_agents_id == agent_id))
agent_id_string = agent_id_string.one_or_none()[0]
agent_endpoint_uri = agent_endpoint_uri.one_or_none()[0]
err = process_command(agent_parameter_id, command_parameters,
AGENT_PARAMETER.get_agent(),agent_id_string)
AGENT_PARAMETER.get_agent(),agent_endpoint_uri)
if not err:
return err
except IntegrityError:
Expand Down
Loading
Loading