diff --git a/cmd/kg/handlers.go b/cmd/kg/handlers.go index be00cee7..e90f0496 100644 --- a/cmd/kg/handlers.go +++ b/cmd/kg/handlers.go @@ -25,6 +25,7 @@ import ( "os/exec" "golang.zx2c4.com/wireguard/wgctrl/wgtypes" + "k8s.io/apimachinery/pkg/types" "github.com/squat/kilo/pkg/mesh" ) @@ -48,6 +49,11 @@ func (h *graphHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.Error(w, fmt.Sprintf("failed to list peers: %v", err), http.StatusInternalServerError) return } + pos, err := h.mesh.Pods().List() + if err != nil { + http.Error(w, fmt.Sprintf("failed to list pods: %v", err), http.StatusInternalServerError) + return + } nodes := make(map[string]*mesh.Node) for _, n := range ns { @@ -65,7 +71,14 @@ func (h *graphHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { peers[p.Name] = p } } - topo, err := mesh.NewTopology(nodes, peers, h.granularity, *h.hostname, 0, wgtypes.Key{}, h.subnet, h.serviceCIDRs, nodes[*h.hostname].PersistentKeepalive, nil) + pods := make(map[types.UID]*mesh.Pod) + for _, p := range pos { + if p.Ready() { + pods[p.Uid] = p + } + } + + topo, err := mesh.NewTopology(nodes, peers, pods, h.granularity, *h.hostname, 0, wgtypes.Key{}, h.subnet, h.serviceCIDRs, nodes[*h.hostname].PersistentKeepalive, nil) if err != nil { http.Error(w, fmt.Sprintf("failed to create topology: %v", err), http.StatusInternalServerError) return diff --git a/cmd/kg/main.go b/cmd/kg/main.go index 66e39a09..5b96809c 100644 --- a/cmd/kg/main.go +++ b/cmd/kg/main.go @@ -60,6 +60,7 @@ var ( }, ", ") availableCompatibilities = strings.Join([]string{ "flannel", + "calico-bgp", }, ", ") availableEncapsulations = strings.Join([]string{ string(encapsulation.Never), @@ -214,11 +215,15 @@ func runRoot(_ *cobra.Command, _ []string) error { } var enc encapsulation.Encapsulator + var watchPods = false switch compatibility { case "flannel": enc = encapsulation.NewFlannel(e) case "cilium": enc = encapsulation.NewCilium(e) + case "calico-bgp": + watchPods = true + fallthrough default: enc = encapsulation.NewIPIP(e) } @@ -241,7 +246,7 @@ func runRoot(_ *cobra.Command, _ []string) error { c := kubernetes.NewForConfigOrDie(config) kc := kiloclient.NewForConfigOrDie(config) ec := apiextensions.NewForConfigOrDie(config) - b = k8s.New(c, kc, ec, topologyLabel, log.With(logger, "component", "k8s backend")) + b = k8s.New(c, kc, ec, topologyLabel, log.With(logger, "component", "k8s backend"), watchPods) default: return fmt.Errorf("backend %v unknown; possible values are: %s", backend, availableBackends) } @@ -259,7 +264,7 @@ func runRoot(_ *cobra.Command, _ []string) error { serviceCIDRs = append(serviceCIDRs, s) } - m, err := mesh.New(b, enc, gr, hostname, port, s, local, cni, cniPath, iface, cleanUp, cleanUpIface, createIface, mtu, resyncPeriod, prioritisePrivateAddr, iptablesForwardRule, serviceCIDRs, log.With(logger, "component", "kilo"), registry) + m, err := mesh.New(b, enc, gr, hostname, port, s, local, cni, cniPath, iface, cleanUp, cleanUpIface, createIface, mtu, resyncPeriod, prioritisePrivateAddr, iptablesForwardRule, serviceCIDRs, log.With(logger, "component", "kilo"), registry, watchPods) if err != nil { return fmt.Errorf("failed to create Kilo mesh: %v", err) } diff --git a/cmd/kgctl/graph.go b/cmd/kgctl/graph.go index cb104089..d99925e4 100644 --- a/cmd/kgctl/graph.go +++ b/cmd/kgctl/graph.go @@ -19,6 +19,7 @@ import ( "github.com/spf13/cobra" "golang.zx2c4.com/wireguard/wgctrl/wgtypes" + "k8s.io/apimachinery/pkg/types" "github.com/squat/kilo/pkg/mesh" ) @@ -67,7 +68,8 @@ func runGraph(_ *cobra.Command, _ []string) error { peers[p.Name] = p } } - t, err := mesh.NewTopology(nodes, peers, opts.granularity, hostname, 0, wgtypes.Key{}, subnet, nil, nodes[hostname].PersistentKeepalive, nil) + pods := make(map[types.UID]*mesh.Pod) + t, err := mesh.NewTopology(nodes, peers, pods, opts.granularity, hostname, 0, wgtypes.Key{}, subnet, nil, nodes[hostname].PersistentKeepalive, nil) if err != nil { return fmt.Errorf("failed to create topology: %w", err) } diff --git a/cmd/kgctl/main.go b/cmd/kgctl/main.go index 9a6c58ac..a753c2d9 100644 --- a/cmd/kgctl/main.go +++ b/cmd/kgctl/main.go @@ -94,7 +94,7 @@ func runRoot(c *cobra.Command, _ []string) error { c := kubernetes.NewForConfigOrDie(config) opts.kc = kiloclient.NewForConfigOrDie(config) ec := apiextensions.NewForConfigOrDie(config) - opts.backend = k8s.New(c, opts.kc, ec, topologyLabel, log.NewNopLogger()) + opts.backend = k8s.New(c, opts.kc, ec, topologyLabel, log.NewNopLogger(), false) default: return fmt.Errorf("backend %s unknown; posible values are: %s", backend, availableBackends) } diff --git a/cmd/kgctl/showconf.go b/cmd/kgctl/showconf.go index 79169f7b..5ed1ca67 100644 --- a/cmd/kgctl/showconf.go +++ b/cmd/kgctl/showconf.go @@ -28,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer/json" + "k8s.io/apimachinery/pkg/types" "github.com/squat/kilo/pkg/k8s/apis/kilo/v1alpha1" "github.com/squat/kilo/pkg/mesh" @@ -152,7 +153,9 @@ func runShowConfNode(_ *cobra.Command, args []string) error { } } - t, err := mesh.NewTopology(nodes, peers, opts.granularity, hostname, int(opts.port), wgtypes.Key{}, subnet, nil, nodes[hostname].PersistentKeepalive, nil) + pods := make(map[types.UID]*mesh.Pod) + + t, err := mesh.NewTopology(nodes, peers, pods, opts.granularity, hostname, int(opts.port), wgtypes.Key{}, subnet, nil, nodes[hostname].PersistentKeepalive, nil) if err != nil { return fmt.Errorf("failed to create topology: %w", err) } @@ -251,11 +254,13 @@ func runShowConfPeer(_ *cobra.Command, args []string) error { return fmt.Errorf("did not find any peer named %q in the cluster", peer) } + pods := make(map[types.UID]*mesh.Pod) + pka := time.Duration(0) if p := peers[peer].PersistentKeepaliveInterval; p != nil { pka = *p } - t, err := mesh.NewTopology(nodes, peers, opts.granularity, hostname, mesh.DefaultKiloPort, wgtypes.Key{}, subnet, nil, pka, nil) + t, err := mesh.NewTopology(nodes, peers, pods, opts.granularity, hostname, mesh.DefaultKiloPort, wgtypes.Key{}, subnet, nil, pka, nil) if err != nil { return fmt.Errorf("failed to create topology: %w", err) } diff --git a/manifests/kilo-k3s-calico-bgp.yaml b/manifests/kilo-k3s-calico-bgp.yaml new file mode 100644 index 00000000..04aafcdb --- /dev/null +++ b/manifests/kilo-k3s-calico-bgp.yaml @@ -0,0 +1,176 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: kilo + namespace: kube-system +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: kilo +rules: +- apiGroups: + - "" + resources: + - nodes + verbs: + - list + - patch + - watch +- apiGroups: + - kilo.squat.ai + resources: + - peers + verbs: + - list + - watch +- apiGroups: + - apiextensions.k8s.io + resources: + - customresourcedefinitions + verbs: + - get +- apiGroups: + - "" + resources: + - pods + verbs: + - list + - watch +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: kilo +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: kilo +subjects: + - kind: ServiceAccount + name: kilo + namespace: kube-system +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: kilo-scripts + namespace: kube-system +data: + init.sh: | + #!/bin/sh + cat > /etc/kubernetes/kubeconfig < add allowed + if e.Pod.IP != nil { + m.pods[key] = e.Pod + diff = true + } + } + case DeleteEvent: + // Remove allowed + delete(m.pods, key) + diff = true + } + m.mu.Unlock() + if diff { + m.applyTopology() + } +} + // checkIn will try to update the local node's LastSeen timestamp // in the backend. func (m *Mesh) checkIn(ctx context.Context) { @@ -473,6 +524,16 @@ func (m *Mesh) applyTopology() { peers[k] = m.peers[k] readyPeers++ } + pods := make(map[types.UID]*Pod) + //var readyPods float64 + for k := range m.pods { + if !m.pods[k].Ready() { + continue + } + // Make it point the pod without copy. + pods[k] = m.pods[k] + //readyPods++ + } m.nodesGuage.Set(readyNodes) m.peersGuage.Set(readyPeers) // We cannot do anything with the topology until the local node is available. @@ -505,7 +566,8 @@ func (m *Mesh) applyTopology() { natEndpoints := discoverNATEndpoints(nodes, peers, wgDevice, m.logger) nodes[m.hostname].DiscoveredEndpoints = natEndpoints - t, err := NewTopology(nodes, peers, m.granularity, m.hostname, nodes[m.hostname].Endpoint.Port(), m.priv, m.subnet, m.serviceCIDRs, nodes[m.hostname].PersistentKeepalive, m.logger) + + t, err := NewTopology(nodes, peers, pods, m.granularity, m.hostname, nodes[m.hostname].Endpoint.Port(), m.priv, m.subnet, m.serviceCIDRs, nodes[m.hostname].PersistentKeepalive, m.logger) if err != nil { level.Error(m.logger).Log("error", err) m.errorCounter.WithLabelValues("apply").Inc() @@ -720,6 +782,14 @@ func peersAreEqual(a, b *Peer) bool { (a.PersistentKeepaliveInterval == nil || *a.PersistentKeepaliveInterval == *b.PersistentKeepaliveInterval) } +func podsAreEqual(a, b *Pod) bool { + if (a != nil) != (b != nil) { + return false + } + return a.Uid == b.Uid && + a.IP == b.IP +} + func ipNetsEqual(a, b *net.IPNet) bool { if a == nil && b == nil { return true diff --git a/pkg/mesh/topology.go b/pkg/mesh/topology.go index ca22bf61..7c66fa8c 100644 --- a/pkg/mesh/topology.go +++ b/pkg/mesh/topology.go @@ -23,6 +23,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "golang.zx2c4.com/wireguard/wgctrl/wgtypes" + "k8s.io/apimachinery/pkg/types" "github.com/squat/kilo/pkg/wireguard" ) @@ -96,7 +97,7 @@ type segment struct { } // NewTopology creates a new Topology struct from a given set of nodes and peers. -func NewTopology(nodes map[string]*Node, peers map[string]*Peer, granularity Granularity, hostname string, port int, key wgtypes.Key, subnet *net.IPNet, serviceCIDRs []*net.IPNet, persistentKeepalive time.Duration, logger log.Logger) (*Topology, error) { +func NewTopology(nodes map[string]*Node, peers map[string]*Peer, pods map[types.UID]*Pod, granularity Granularity, hostname string, port int, key wgtypes.Key, subnet *net.IPNet, serviceCIDRs []*net.IPNet, persistentKeepalive time.Duration, logger log.Logger) (*Topology, error) { if logger == nil { logger = log.NewNopLogger() } @@ -176,6 +177,13 @@ func NewTopology(nodes map[string]*Node, peers map[string]*Peer, granularity Gra } cidrs = append(cidrs, node.Subnet) hostnames = append(hostnames, node.Name) + + for k := range pods { + if pods[k].NodeName == node.Name { + level.Debug(t.logger).Log("msg", "Add ip pod on allowedip wireguard", "nodename", node.Name, "allowedip", *pods[k].IP) + allowedIPs = append(allowedIPs, *pods[k].IP) + } + } } // The sorting has no function, but makes testing easier. sort.Slice(allowedLocationIPs, func(i, j int) bool {