Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
09a7c34
links() and transports() in info()
milyin Dec 5, 2025
a6577c6
cargo fmt
milyin Dec 5, 2025
cc43776
accessors to private fileds
milyin Dec 5, 2025
25022af
removed no_run, tested docs
milyin Dec 5, 2025
2685fb7
events first implementation
milyin Dec 5, 2025
7e6a860
connectivity event handling separated
milyin Dec 5, 2025
04f42f0
use weaksession
milyin Dec 5, 2025
41e36de
cargo fmt
milyin Dec 6, 2025
5a0f9c7
clippy fix
milyin Dec 8, 2025
5a6db0a
added cancellation token to transportevents
milyin Dec 9, 2025
1102aa9
cancellation token support
milyin Dec 9, 2025
e9d018c
cancellation tests
milyin Dec 9, 2025
5ad9e9f
test sync fixes
milyin Dec 9, 2025
7c5edff
use statements
milyin Dec 9, 2025
c90ea58
clippy fix
milyin Dec 9, 2025
ccb958b
Merge branch 'main' into connectivity_api2
milyin Dec 9, 2025
5f1bbea
crgo lock
milyin Dec 9, 2025
a231238
doc updates
milyin Dec 9, 2025
53974df
filtering by transport
milyin Dec 9, 2025
5b9469d
get_links corrected
milyin Dec 9, 2025
a4039cc
events methods shortened
milyin Dec 9, 2025
949cc15
renamed to _listemer
milyin Dec 10, 2025
ae52a53
Merge branch 'main' into connectivity_api2
milyin Dec 12, 2025
6bdea98
info transport builder separated, cancellation removed
milyin Dec 12, 2025
71e1caf
info links builder separated, cancellation removed
milyin Dec 12, 2025
c49f803
Listener objects
milyin Dec 12, 2025
a9e243d
id is u32
milyin Dec 12, 2025
9ec45ca
transport events listener moved to session
milyin Dec 12, 2025
13ce863
runtime removed from info()
milyin Dec 12, 2025
e6cdc04
rustfmt
milyin Dec 12, 2025
60f6bee
background added to transporteventlistener
milyin Dec 12, 2025
aff5b1d
background added to transport events listener
milyin Dec 13, 2025
52afbec
test refactor
milyin Dec 13, 2025
5f8e792
session open moved to common
milyin Dec 13, 2025
ca0f8f2
test shortened
milyin Dec 13, 2025
47dba25
useless test removed
milyin Dec 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
416 changes: 416 additions & 0 deletions zenoh/src/api/builders/info_links.rs

Large diffs are not rendered by default.

439 changes: 439 additions & 0 deletions zenoh/src/api/builders/info_transport.rs

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions zenoh/src/api/builders/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

pub(crate) mod close;
pub(crate) mod info;
pub(crate) mod info_links;
pub(crate) mod info_transport;
pub(crate) mod liveliness;
pub(crate) mod matching_listener;
pub(crate) mod publisher;
Expand Down
141 changes: 141 additions & 0 deletions zenoh/src/api/connectivity.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

//! Connectivity event handler - independent from adminspace
//!
//! This handler subscribes to transport events and broadcasts them
//! to user-registered callbacks through the connectivity API.

use std::sync::Arc;

use zenoh_result::ZResult;
use zenoh_transport::{
TransportEventHandler, TransportMulticastEventHandler, TransportPeer, TransportPeerEventHandler,
};

use crate::api::session::WeakSession;
#[cfg(feature = "unstable")]
use crate::sample::SampleKind;
/// Handler for connectivity events - independent from adminspace
#[cfg(feature = "unstable")]
pub(crate) struct ConnectivityHandler {
session: WeakSession,
}

#[cfg(feature = "unstable")]
impl ConnectivityHandler {
pub(crate) fn new(session: WeakSession) -> Self {
Self { session }
}
}

#[cfg(feature = "unstable")]
impl TransportEventHandler for ConnectivityHandler {
fn new_unicast(
&self,
peer: TransportPeer,
_transport: zenoh_transport::unicast::TransportUnicast,
) -> ZResult<Arc<dyn TransportPeerEventHandler>> {
// Broadcast transport opened event
self.session
.broadcast_transport_event(SampleKind::Put, &peer);

// Return ConnectivityPeerHandler
Ok(Arc::new(ConnectivityPeerHandler {
session: self.session.clone(),
peer_zid: peer.zid,
peer,
}))
}

fn new_multicast(
&self,
_transport: zenoh_transport::multicast::TransportMulticast,
) -> ZResult<Arc<dyn TransportMulticastEventHandler>> {
Ok(Arc::new(ConnectivityMulticastHandler {
session: self.session.clone(),
}))
}
}

/// Peer handler for connectivity events
#[cfg(feature = "unstable")]
pub(crate) struct ConnectivityPeerHandler {
session: WeakSession,
peer_zid: zenoh_protocol::core::ZenohIdProto,
peer: TransportPeer,
}

#[cfg(feature = "unstable")]
impl TransportPeerEventHandler for ConnectivityPeerHandler {
fn handle_message(&self, _msg: zenoh_protocol::network::NetworkMessageMut) -> ZResult<()> {
// Connectivity doesn't need to handle messages
Ok(())
}

fn new_link(&self, link: zenoh_link::Link) {
// Broadcast link added event
self.session
.runtime
.broadcast_link_event(SampleKind::Put, self.peer_zid, &link);
}

fn del_link(&self, link: zenoh_link::Link) {
// Broadcast link removed event
self.session
.runtime
.broadcast_link_event(SampleKind::Delete, self.peer_zid, &link);
}

fn closed(&self) {
// Broadcast transport closed event
self.session
.broadcast_transport_event(SampleKind::Delete, &self.peer);
}

fn as_any(&self) -> &dyn std::any::Any {
self
}
}

/// Multicast handler for connectivity events
#[cfg(feature = "unstable")]
pub(crate) struct ConnectivityMulticastHandler {
session: WeakSession,
}

#[cfg(feature = "unstable")]
impl TransportMulticastEventHandler for ConnectivityMulticastHandler {
fn new_peer(&self, peer: TransportPeer) -> ZResult<Arc<dyn TransportPeerEventHandler>> {
// Broadcast transport opened event

self.session
.broadcast_transport_event(SampleKind::Put, &peer);

// Return ConnectivityPeerHandler
Ok(Arc::new(ConnectivityPeerHandler {
session: self.session.clone(),
peer_zid: peer.zid,
peer,
}))
}

fn closed(&self) {
// Nothing to do for multicast group closure
}

fn as_any(&self) -> &dyn std::any::Any {
self
}
}
Loading
Loading