Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions core/plugin/aitools/api/decorators/api_meta.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
"""
ApiMeta module for defining API metadata such as method, path, query, body, response, summary, description, tags, internal, and deprecated.
ApiMeta module for defining API metadata such as
method, path, query, body, response, summary, description, tags,
and deprecated.
"""

# pylint: disable=too-many-instance-attributes
from dataclasses import dataclass
from typing import Generic, Literal, Optional, Type, TypeVar

Expand All @@ -15,7 +18,8 @@

@dataclass(frozen=True)
class ApiMeta(Generic[QueryT, BodyT, RespT]):
# HTTP request configuration
"""HTTP API metadata."""

method: str
path: str
query: Optional[Type[QueryT]] = None
Expand Down
1 change: 1 addition & 0 deletions core/plugin/aitools/api/decorators/api_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
ApiService module for registering API services.
"""

# pylint: disable=too-many-arguments
from typing import Callable, Literal, Optional

from plugin.aitools.api.decorators.api_meta import ApiMeta, BodyT, QueryT, RespT
Expand Down
10 changes: 5 additions & 5 deletions core/plugin/aitools/api/routes/endpoint_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ async def endpoint_async(
endpoint_async.__name__ = service_func.__name__
endpoint_async.__doc__ = meta.description or service_func.__doc__
return endpoint_async
else:
cast(Any, endpoint_sync).__signature__ = inspect.Signature(params)
endpoint_sync.__name__ = service_func.__name__
endpoint_sync.__doc__ = meta.description or service_func.__doc__
return endpoint_sync

cast(Any, endpoint_sync).__signature__ = inspect.Signature(params)
endpoint_sync.__name__ = service_func.__name__
endpoint_sync.__doc__ = meta.description or service_func.__doc__
return endpoint_sync
3 changes: 3 additions & 0 deletions core/plugin/aitools/app/start_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from plugin.aitools.api.routes.register import register_api_services
from plugin.aitools.common.clients.aiohttp_client import close_aiohttp_session
from plugin.aitools.common.exceptions.exceptions import register_exception_handlers
from plugin.aitools.common.log.logger import init_uvicorn_logger
from plugin.aitools.const.const import OTLP_ENABLE_KEY, SERVICE_PORT_KEY
from plugin.aitools.utils.otlp_utils import (
init_kafka_send_workers,
Expand Down Expand Up @@ -106,6 +107,8 @@ def start_uvicorn() -> None:
if not (service_port := os.getenv(SERVICE_PORT_KEY)):
raise ValueError(f"Missing {SERVICE_PORT_KEY} environment variable")

init_uvicorn_logger()

print(f"🚀 Starting server on port {service_port}")
uvicorn_config = uvicorn.Config(
app=aitools_app(),
Expand Down
80 changes: 54 additions & 26 deletions core/plugin/aitools/common/clients/hooks.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import json

from common.otlp.trace.span import SPAN_SIZE_LIMIT
from plugin.aitools.api.schemas.types import ErrorResponse
from plugin.aitools.common.clients.adapters import ClientT, SpanLike
from plugin.aitools.common.log.logger import log


def add_info(span: SpanLike, key: str, value: str) -> None:
Expand All @@ -12,39 +14,65 @@ def add_info(span: SpanLike, key: str, value: str) -> None:

class WebSocketSpanHooks:
def setup(self, client: ClientT, span: SpanLike) -> None:
span.set_attributes(
{
"ws_url": client.url,
"ws_params": json.dumps(client.ws_params, indent=2, ensure_ascii=False),
"ws_kwargs": json.dumps(client.kwargs, indent=2, ensure_ascii=False),
}
)
try:
span.set_attributes(
{
"ws_url": client.url,
"ws_params": json.dumps(
client.ws_params, indent=2, ensure_ascii=False
),
"ws_kwargs": json.dumps(
client.kwargs, indent=2, ensure_ascii=False
),
}
)
except Exception as e:
log.exception(
f"Failed to set attributes for span in WebSocketSpanHooks: {e}"
)

async def teardown(self, client: ClientT, span: SpanLike) -> None:
if client.send_data_list:
send_data = json.dumps(client.send_data_list, indent=2, ensure_ascii=False)
add_info(span, "Send data", send_data)
if client.recv_data_list:
recv_data = json.dumps(client.recv_data_list, indent=2, ensure_ascii=False)
add_info(span, "Recv data", recv_data)
try:
if client.send_data_list:
send_data = json.dumps(
client.send_data_list, indent=2, ensure_ascii=False
)
add_info(span, "Send data", send_data)
if client.recv_data_list:
recv_data = json.dumps(
client.recv_data_list, indent=2, ensure_ascii=False
)
add_info(span, "Recv data", recv_data)

await client.close()
await client.close()
except Exception as e:
log.exception(
f"Failed to add info events for span in WebSocketSpanHooks: {e}"
)


class HttpSpanHooks:
def setup(self, client: ClientT, span: SpanLike) -> None:
span.set_attributes(
{"Request URL": client.url, "Request method": client.method}
)
try:
span.set_attributes(
{"Request URL": client.url, "Request method": client.method}
)

kwargs_str = json.dumps(client.kwargs, indent=2, ensure_ascii=False)
add_info(span, "Request kwargs", kwargs_str)
kwargs_str = json.dumps(client.kwargs, indent=2, ensure_ascii=False)
add_info(span, "Request kwargs", kwargs_str)
except Exception as e:
log.exception(f"Failed to set attributes for span in HttpSpanHooks: {e}")

async def teardown(self, client: ClientT, span: SpanLike) -> None:
if isinstance(client.response.data["content"], bytes):
response_str = (
f"Binary data, length: {len(client.response.data['content'])}"
)
else:
response_str = client.response.model_dump_json()
add_info(span, "Response", response_str)
try:
if isinstance(client.response, ErrorResponse):
response_str = client.response.model_dump_json()
elif isinstance(client.response.data.get("content", None), bytes):
response_str = (
f"Binary data, length: {len(client.response.data['content'])}"
)
else:
response_str = client.response.model_dump_json()
add_info(span, "Response", response_str)
except Exception as e:
log.exception(f"Failed to add info events for span in HttpSpanHooks: {e}")
8 changes: 7 additions & 1 deletion core/plugin/aitools/common/log/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,13 @@


def init_uvicorn_logger() -> None:
logger_names = ("uvicorn.asgi", "uvicorn.access", "uvicorn")
logger_names = (
"uvicorn.asgi",
"uvicorn.access",
"uvicorn",
"uvicorn.error",
"fastapi",
)

# change handler for default uvicorn logger
logging.getLogger().handlers = [InterceptHandler()]
Expand Down
2 changes: 2 additions & 0 deletions core/plugin/aitools/service/dial_test/dial_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Dial test service module providing health checks and service availability monitoring.
"""

# pylint: disable=line-too-long,broad-exception-caught,unused-argument
from typing import Any, Dict, Optional

import requests
Expand All @@ -20,6 +21,7 @@
async def dial_test_servic(
request: Request,
) -> Dict[str, Any] | None:
"""Dial test service"""
return dial_test_main(
method="GET",
url="http://localhost/health",
Expand Down
26 changes: 22 additions & 4 deletions core/plugin/aitools/service/dial_test/dial_test_client.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
"""
Dial test client
"""

# pylint: disable=too-many-arguments,too-few-public-methods,broad-exception-caught
import json
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
Expand All @@ -7,12 +12,16 @@


class ErrorResponse(TypedDict):
"""Error response data type"""

code: int
message: str
data: Dict[str, str]


class APIConfiguration:
"""API configuration data type"""

def __init__(
self,
target_url: str,
Expand All @@ -32,6 +41,7 @@ def __init__(
self.call_frequency = call_frequency

def dict(self) -> dict:
"""Return API configuration as a dictionary."""
return {
"url": self.url,
"method": self.method,
Expand All @@ -43,9 +53,12 @@ def dict(self) -> dict:


class APITester:
"""API tester class"""

def execute_request(
self, config: APIConfiguration
) -> Union[ErrorResponse, Dict[str, Any]]:
"""Execute API request with specified configuration."""
ex_res: ErrorResponse = {
"code": -1,
"message": "failed",
Expand All @@ -61,8 +74,7 @@ def execute_request(
str(res["code"]) + "_" + str(config.url).rsplit("/", maxsplit=1)[-1]
)
return res
else:
return {}
return {}
except requests.exceptions.Timeout:
ex_res["data"]["msg"] = "The request timed out."
# print("The request timed out.")
Expand All @@ -76,7 +88,10 @@ def execute_request(


class MainRunner:
"""Main runner class"""

def __init__(self, max_workers: Optional[int] = None) -> None:
"""Initialize the main runner."""
# load_dotenv('../../../dialtest.env')
self.api_configs = self.load_api_configs()
self.tester = APITester()
Expand All @@ -86,6 +101,7 @@ def __init__(self, max_workers: Optional[int] = None) -> None:

# List of interfaces to test
def interface_list(self) -> List[str]:
"""Get list of interfaces to test."""
# int_list = ["TTS", "SMARTTS"]

# print('("INTERFACE_LIST_STR"):',os.getenv("INTERFACE_LIST_STR"))
Expand All @@ -97,6 +113,7 @@ def interface_list(self) -> List[str]:
return int_list

def load_api_configs(self) -> List[APIConfiguration]:
"""Load API configurations from environment variables."""
configs = []
for prefix in self.interface_list():
configs.append(
Expand All @@ -106,14 +123,15 @@ def load_api_configs(self) -> List[APIConfiguration]:
headers=json.loads(os.getenv(f"{prefix}_HEADERS", "{}")),
params=json.loads(os.getenv(f"{prefix}_PARAMS", "{}")),
payload=json.loads(os.getenv(f"{prefix}_PAYLOAD", "{}")),
success_code=int(os.getenv(f"{prefix}_SUCCESS_CODE", -1)),
call_frequency=int(os.getenv(f"{prefix}_CALL_FREQUENCY", 1)),
success_code=int(os.getenv(f"{prefix}_SUCCESS_CODE", "-1")),
call_frequency=int(os.getenv(f"{prefix}_CALL_FREQUENCY", "1")),
)
)

return configs

def run_tests(self) -> Dict[str, Any]:
"""Run API tests and return results."""
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = {
executor.submit(self.tester.execute_request, config): config
Expand Down
9 changes: 8 additions & 1 deletion core/plugin/aitools/service/dial_test/mock_test.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
"""
Test the mock service.
"""

# pylint: disable=unused-argument
import requests
from fastapi import Request
from plugin.aitools.api.decorators.api_service import api_service
Expand All @@ -16,6 +21,7 @@
async def async_mock_test_service(
request: Request,
) -> BaseResponse:
"""Async Mock test service"""
client = HttpClient(method="GET", url="http://localhost:8086/ping")
response = await client.request()
# file_name = "test.txt"
Expand All @@ -35,7 +41,8 @@ async def async_mock_test_service(
def sync_mock_test_service(
request: Request,
) -> BaseResponse:
response = requests.get("http://localhost:8086/ping")
"""Sync Mock test service"""
response = requests.get("http://localhost:8086/ping", timeout=10)
# file_name = "test.txt"
# file_bytes = response.content
# oss_service = get_oss_service()
Expand Down
Loading
Loading