This service handles incoming message translation from MQTT to Kafka for the Cyberpartner system. It consists of multiple microservices that process different event types, ensuring proper communication between badge devices (MQTT) and backend services (Kafka).
The service translates various MQTT events from badges to Kafka topics:
- Heartbeat: Available heap from the badge, received every 5min
- Achievements: Parses MQTT topic and extracts achievement type from badge
- Game Play: Handles game play events from badges
- Create: Handles new Cyberpartner creation events
- Death: Processes Cyberpartner death events
- Update State: Manages state updates from badges
- Update Inventory: Processes inventory state changes
- Achievement: Processes achievement events
- Store: Manages store-related updates
- World Events: Handles scheduled (day) and random world events
- Kiosk: Processes kiosk interaction events (TBD)
- Deadbeef: Whenever we detect an iButton spoof event
- Containerized Microservices: Each translation service runs as a separate container to scale independently
- Infrastructure: Depends on Kafka, Redis, and MQTT broker
- CI/CD: Automated builds and deployments via CircleCI and ArgoCD
- Configuration: Uses Doppler for secrets management
py3.12 poetry
pipx install poetry
poetry --versionhttps://scoop.sh/ https://pipx.pypa.io/stable/installation/
Docker compose w/doppler to inject secrets / os vars
# Build the image
doppler run -- docker compose -f docker-compose.services.yaml build
# Run with infrastructure (Kafka, Redis, etc.)
doppler run -- docker compose -f docker-compose.infrastructure.yaml up -d
doppler run -- docker compose -f docker-compose.services.yaml up -dTo run a specific translation service:
# Run a specific MQTT to Kafka service
doppler run -- docker compose -f docker-compose.services.yaml run translate-mqtt-kafka-create
# For other services, modify the service name accordinglyWe expect data to flow from MQTT to Kafka. So, an example to create a new Cyberpartner would look like this:
import ntplib
from lockfale_connectors.mqtt.mqtt_publisher import MQTTPublisher
if __name__ == "__main__":
mqtt_host = os.getenv("MQTT_HOST")
mqtt_port = int(os.getenv("MQTT_PORT"))
dplr_env = os.getenv("DOPPLER_ENVIRONMENT")
mqtt_pub = MQTTPublisher(f"publisher-local-create-cp")
mqtt_pub.connect(keep_alive=10)
mqtt_pub.client.loop_start()
_topic = f"cackalacky/badge/egress/{badge_id}/cp/create"
# we used NTP to get the current time on the backend and on the badges to stay relatively in sync
# however, you can use whatever you want here, as long as the payload includes an epoch timestamp
ntp_dt = get_ntp_time()
epoch_ts = ntp_dt.timestamp()
json_obj = {"ts": epoch_ts}
logger.info(json_obj)
msg_info = _pub.publish(
_topic,
json.dumps(json_obj),
)
msg_info.wait_for_publish()
mqtt_pub.disconnect()From here, the create_cyberpartner.py service will pick up the message and translate it to Kafka by enriching the payload.
At a bare minimum, we enrich the payload with the following:
badge_id: The badge ID of the source of the event (extracted from the MQTT topic)event_uuid: A unique UUID for the eventevent_timestamp: The timestamp of the eventmessage_received_ts: The timestamp the message was received by the translation serviceevent_source: The source of the event (badge, etc.)event_type: The type of event (cp.create, cp.death, etc.)event_subtype: The subtype of the event (button.touch, cp.death, mass.locusts, etc.)
The service is deployed to Kubernetes using:
- CircleCI for CI/CD pipeline
- ArgoCD for GitOps deployment
- ECR for container registry
poetry run isort .
poetry run black .- taskfile