Skip to main content

hydro_lang/compile/
deploy.rs

1use std::collections::{HashMap, HashSet};
2use std::io::Error;
3use std::marker::PhantomData;
4use std::pin::Pin;
5
6use bytes::{Bytes, BytesMut};
7use futures::{Sink, Stream};
8use proc_macro2::Span;
9use serde::Serialize;
10use serde::de::DeserializeOwned;
11use slotmap::{SecondaryMap, SlotMap, SparseSecondaryMap};
12use stageleft::QuotedWithContext;
13
14use super::built::build_inner;
15use super::compiled::CompiledFlow;
16use super::deploy_provider::{
17    ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, Node, ProcessSpec, RegisterPort,
18};
19use super::ir::HydroRoot;
20use crate::live_collections::stream::{Ordering, Retries};
21use crate::location::dynamic::LocationId;
22use crate::location::external_process::{
23    ExternalBincodeBidi, ExternalBincodeSink, ExternalBincodeStream, ExternalBytesPort,
24};
25use crate::location::{Cluster, External, Location, LocationKey, LocationType, Process};
26use crate::staging_util::Invariant;
27use crate::telemetry::Sidecar;
28
29pub struct DeployFlow<'a, D>
30where
31    D: Deploy<'a>,
32{
33    pub(super) ir: Vec<HydroRoot>,
34
35    pub(super) locations: SlotMap<LocationKey, LocationType>,
36    pub(super) location_names: SecondaryMap<LocationKey, String>,
37
38    /// Deployed instances of each process in the flow
39    pub(super) processes: SparseSecondaryMap<LocationKey, D::Process>,
40    pub(super) clusters: SparseSecondaryMap<LocationKey, D::Cluster>,
41    pub(super) externals: SparseSecondaryMap<LocationKey, D::External>,
42
43    /// Sidecars which may be added to each location (process or cluster, not externals).
44    /// See [`crate::telemetry::Sidecar`].
45    pub(super) sidecars: SparseSecondaryMap<LocationKey, Vec<syn::Expr>>,
46
47    /// Application name used in telemetry.
48    pub(super) flow_name: String,
49
50    pub(super) _phantom: Invariant<'a, D>,
51}
52
53impl<'a, D: Deploy<'a>> DeployFlow<'a, D> {
54    pub fn ir(&self) -> &Vec<HydroRoot> {
55        &self.ir
56    }
57
58    /// Application name used in telemetry.
59    pub fn flow_name(&self) -> &str {
60        &self.flow_name
61    }
62
63    pub fn with_process<P>(
64        mut self,
65        process: &Process<P>,
66        spec: impl IntoProcessSpec<'a, D>,
67    ) -> Self {
68        self.processes.insert(
69            process.key,
70            spec.into_process_spec()
71                .build(process.key, &self.location_names[process.key]),
72        );
73        self
74    }
75
76    /// TODO(mingwei): unstable API
77    #[doc(hidden)]
78    pub fn with_process_erased(
79        mut self,
80        process_loc_key: LocationKey,
81        spec: impl IntoProcessSpec<'a, D>,
82    ) -> Self {
83        assert_eq!(
84            Some(&LocationType::Process),
85            self.locations.get(process_loc_key),
86            "No process with the given `LocationKey` was found."
87        );
88        self.processes.insert(
89            process_loc_key,
90            spec.into_process_spec()
91                .build(process_loc_key, &self.location_names[process_loc_key]),
92        );
93        self
94    }
95
96    pub fn with_remaining_processes<S: IntoProcessSpec<'a, D> + 'a>(
97        mut self,
98        spec: impl Fn() -> S,
99    ) -> Self {
100        for (location_key, &location_type) in self.locations.iter() {
101            if LocationType::Process == location_type {
102                self.processes
103                    .entry(location_key)
104                    .expect("location was removed")
105                    .or_insert_with(|| {
106                        spec()
107                            .into_process_spec()
108                            .build(location_key, &self.location_names[location_key])
109                    });
110            }
111        }
112        self
113    }
114
115    pub fn with_cluster<C>(mut self, cluster: &Cluster<C>, spec: impl ClusterSpec<'a, D>) -> Self {
116        self.clusters.insert(
117            cluster.key,
118            spec.build(cluster.key, &self.location_names[cluster.key]),
119        );
120        self
121    }
122
123    /// TODO(mingwei): unstable API
124    #[doc(hidden)]
125    pub fn with_cluster_erased(
126        mut self,
127        cluster_loc_key: LocationKey,
128        spec: impl ClusterSpec<'a, D>,
129    ) -> Self {
130        assert_eq!(
131            Some(&LocationType::Cluster),
132            self.locations.get(cluster_loc_key),
133            "No cluster with the given `LocationKey` was found."
134        );
135        self.clusters.insert(
136            cluster_loc_key,
137            spec.build(cluster_loc_key, &self.location_names[cluster_loc_key]),
138        );
139        self
140    }
141
142    pub fn with_remaining_clusters<S: ClusterSpec<'a, D> + 'a>(
143        mut self,
144        spec: impl Fn() -> S,
145    ) -> Self {
146        for (location_key, &location_type) in self.locations.iter() {
147            if LocationType::Cluster == location_type {
148                self.clusters
149                    .entry(location_key)
150                    .expect("location was removed")
151                    .or_insert_with(|| {
152                        spec().build(location_key, &self.location_names[location_key])
153                    });
154            }
155        }
156        self
157    }
158
159    pub fn with_external<P>(
160        mut self,
161        external: &External<P>,
162        spec: impl ExternalSpec<'a, D>,
163    ) -> Self {
164        self.externals.insert(
165            external.key,
166            spec.build(external.key, &self.location_names[external.key]),
167        );
168        self
169    }
170
171    pub fn with_remaining_externals<S: ExternalSpec<'a, D> + 'a>(
172        mut self,
173        spec: impl Fn() -> S,
174    ) -> Self {
175        for (location_key, &location_type) in self.locations.iter() {
176            if LocationType::External == location_type {
177                self.externals
178                    .entry(location_key)
179                    .expect("location was removed")
180                    .or_insert_with(|| {
181                        spec().build(location_key, &self.location_names[location_key])
182                    });
183            }
184        }
185        self
186    }
187
188    /// Adds a [`Sidecar`] to all processes and clusters in the flow.
189    pub fn with_sidecar_all(mut self, sidecar: &impl Sidecar) -> Self {
190        for (location_key, &location_type) in self.locations.iter() {
191            if !matches!(location_type, LocationType::Process | LocationType::Cluster) {
192                continue;
193            }
194
195            let location_name = &self.location_names[location_key];
196
197            let sidecar = sidecar.to_expr(
198                self.flow_name(),
199                location_key,
200                location_type,
201                location_name,
202                &quote::format_ident!("{}", super::DFIR_IDENT),
203            );
204            self.sidecars
205                .entry(location_key)
206                .expect("location was removed")
207                .or_default()
208                .push(sidecar);
209        }
210
211        self
212    }
213
214    /// Adds a [`Sidecar`] to the given location.
215    pub fn with_sidecar_internal(
216        mut self,
217        location_key: LocationKey,
218        sidecar: &impl Sidecar,
219    ) -> Self {
220        let location_type = self.locations[location_key];
221        let location_name = &self.location_names[location_key];
222        let sidecar = sidecar.to_expr(
223            self.flow_name(),
224            location_key,
225            location_type,
226            location_name,
227            &quote::format_ident!("{}", super::DFIR_IDENT),
228        );
229        self.sidecars
230            .entry(location_key)
231            .expect("location was removed")
232            .or_default()
233            .push(sidecar);
234        self
235    }
236
237    /// Adds a [`Sidecar`] to a specific process in the flow.
238    pub fn with_sidecar_process(self, process: &Process<()>, sidecar: &impl Sidecar) -> Self {
239        self.with_sidecar_internal(process.key, sidecar)
240    }
241
242    /// Adds a [`Sidecar`] to a specific cluster in the flow.
243    pub fn with_sidecar_cluster(self, cluster: &Cluster<()>, sidecar: &impl Sidecar) -> Self {
244        self.with_sidecar_internal(cluster.key, sidecar)
245    }
246
247    /// Compiles the flow into DFIR ([`dfir_lang::graph::DfirGraph`]) without networking.
248    /// Useful for generating Mermaid diagrams of the DFIR.
249    ///
250    /// (This returned DFIR will not compile due to the networking missing).
251    pub fn preview_compile(&mut self) -> CompiledFlow<'a> {
252        // NOTE: `build_inner` does not actually mutate the IR, but `&mut` is required
253        // only because the shared traversal logic requires it
254        CompiledFlow {
255            dfir: build_inner(&mut self.ir),
256            extra_stmts: SparseSecondaryMap::new(),
257            sidecars: SparseSecondaryMap::new(),
258            _phantom: PhantomData,
259        }
260    }
261
262    /// Compiles the flow into DFIR ([`dfir_lang::graph::DfirGraph`]) including networking.
263    ///
264    /// (This does not compile the DFIR itself, instead use [`Self::deploy`] to compile & deploy the DFIR).
265    pub fn compile(mut self) -> CompiledFlow<'a>
266    where
267        D: Deploy<'a, InstantiateEnv = ()>,
268    {
269        self.compile_internal(&mut ())
270    }
271
272    /// Same as [`Self::compile`] but does not invalidate `self`, for internal use.
273    ///
274    /// Empties `self.sidecars` and modifies `self.ir`, leaving `self` in a partial state.
275    pub(super) fn compile_internal(&mut self, env: &mut D::InstantiateEnv) -> CompiledFlow<'a> {
276        let mut seen_tees: HashMap<_, _> = HashMap::new();
277        let mut seen_cluster_members = HashSet::new();
278        let mut extra_stmts = SparseSecondaryMap::new();
279        for leaf in self.ir.iter_mut() {
280            leaf.compile_network::<D>(
281                &mut extra_stmts,
282                &mut seen_tees,
283                &mut seen_cluster_members,
284                &self.processes,
285                &self.clusters,
286                &self.externals,
287                env,
288            );
289        }
290
291        CompiledFlow {
292            dfir: build_inner(&mut self.ir),
293            extra_stmts,
294            sidecars: std::mem::take(&mut self.sidecars),
295            _phantom: PhantomData,
296        }
297    }
298
299    /// Creates the variables for cluster IDs and adds them into `extra_stmts`.
300    fn cluster_id_stmts(&self, extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>) {
301        #[expect(
302            clippy::disallowed_methods,
303            reason = "nondeterministic iteration order, will be sorted"
304        )]
305        let mut all_clusters_sorted = self.clusters.keys().collect::<Vec<_>>();
306        all_clusters_sorted.sort();
307
308        for cluster_key in all_clusters_sorted {
309            let self_id_ident = syn::Ident::new(
310                &format!("__hydro_lang_cluster_self_id_{}", cluster_key),
311                Span::call_site(),
312            );
313            let self_id_expr = D::cluster_self_id().splice_untyped();
314            extra_stmts
315                .entry(cluster_key)
316                .expect("location was removed")
317                .or_default()
318                .push(syn::parse_quote! {
319                    let #self_id_ident = &*Box::leak(Box::new(#self_id_expr));
320                });
321
322            let process_cluster_locations = self.location_names.keys().filter(|&location_key| {
323                self.processes.contains_key(location_key)
324                    || self.clusters.contains_key(location_key)
325            });
326            for other_location in process_cluster_locations {
327                let other_id_ident = syn::Ident::new(
328                    &format!("__hydro_lang_cluster_ids_{}", cluster_key),
329                    Span::call_site(),
330                );
331                let other_id_expr = D::cluster_ids(cluster_key).splice_untyped();
332                extra_stmts
333                    .entry(other_location)
334                    .expect("location was removed")
335                    .or_default()
336                    .push(syn::parse_quote! {
337                        let #other_id_ident = #other_id_expr;
338                    });
339            }
340        }
341    }
342
343    /// Compiles and deploys the flow.
344    ///
345    /// Rough outline of steps:
346    /// * Compiles the Hydro into DFIR.
347    /// * Instantiates nodes as configured.
348    /// * Compiles the corresponding DFIR into binaries for nodes as needed.
349    /// * Connects up networking as needed.
350    #[must_use]
351    pub fn deploy(mut self, env: &mut D::InstantiateEnv) -> DeployResult<'a, D> {
352        let CompiledFlow {
353            dfir,
354            mut extra_stmts,
355            mut sidecars,
356            _phantom,
357        } = self.compile_internal(env);
358
359        let mut compiled = dfir;
360        self.cluster_id_stmts(&mut extra_stmts);
361        let mut meta = D::Meta::default();
362
363        let (processes, clusters, externals) = (
364            self.processes
365                .into_iter()
366                .filter(|&(node_key, ref node)| {
367                    if let Some(ir) = compiled.remove(node_key) {
368                        node.instantiate(
369                            env,
370                            &mut meta,
371                            ir,
372                            extra_stmts.remove(node_key).as_deref().unwrap_or_default(),
373                            sidecars.remove(node_key).as_deref().unwrap_or_default(),
374                        );
375                        true
376                    } else {
377                        false
378                    }
379                })
380                .collect::<SparseSecondaryMap<_, _>>(),
381            self.clusters
382                .into_iter()
383                .filter(|&(cluster_key, ref cluster)| {
384                    if let Some(ir) = compiled.remove(cluster_key) {
385                        cluster.instantiate(
386                            env,
387                            &mut meta,
388                            ir,
389                            extra_stmts
390                                .remove(cluster_key)
391                                .as_deref()
392                                .unwrap_or_default(),
393                            sidecars.remove(cluster_key).as_deref().unwrap_or_default(),
394                        );
395                        true
396                    } else {
397                        false
398                    }
399                })
400                .collect::<SparseSecondaryMap<_, _>>(),
401            self.externals
402                .into_iter()
403                .inspect(|&(external_key, ref external)| {
404                    assert!(!extra_stmts.contains_key(external_key));
405                    assert!(!sidecars.contains_key(external_key));
406                    external.instantiate(env, &mut meta, Default::default(), &[], &[]);
407                })
408                .collect::<SparseSecondaryMap<_, _>>(),
409        );
410
411        for location_key in self.locations.keys() {
412            if let Some(node) = processes.get(location_key) {
413                node.update_meta(&meta);
414            } else if let Some(cluster) = clusters.get(location_key) {
415                cluster.update_meta(&meta);
416            } else if let Some(external) = externals.get(location_key) {
417                external.update_meta(&meta);
418            }
419        }
420
421        let mut seen_tees_connect = HashMap::new();
422        for leaf in self.ir.iter_mut() {
423            leaf.connect_network(&mut seen_tees_connect);
424        }
425
426        DeployResult {
427            location_names: self.location_names,
428            processes,
429            clusters,
430            externals,
431        }
432    }
433}
434
435pub struct DeployResult<'a, D: Deploy<'a>> {
436    location_names: SecondaryMap<LocationKey, String>,
437    processes: SparseSecondaryMap<LocationKey, D::Process>,
438    clusters: SparseSecondaryMap<LocationKey, D::Cluster>,
439    externals: SparseSecondaryMap<LocationKey, D::External>,
440}
441
442impl<'a, D: Deploy<'a>> DeployResult<'a, D> {
443    pub fn get_process<P>(&self, p: &Process<P>) -> &D::Process {
444        let LocationId::Process(location_key) = p.id() else {
445            panic!("Process ID expected")
446        };
447        self.processes.get(location_key).unwrap()
448    }
449
450    pub fn get_cluster<C>(&self, c: &Cluster<'a, C>) -> &D::Cluster {
451        let LocationId::Cluster(location_key) = c.id() else {
452            panic!("Cluster ID expected")
453        };
454        self.clusters.get(location_key).unwrap()
455    }
456
457    pub fn get_external<P>(&self, e: &External<P>) -> &D::External {
458        self.externals.get(e.key).unwrap()
459    }
460
461    pub fn get_all_processes(&self) -> impl Iterator<Item = (LocationId, &str, &D::Process)> {
462        self.location_names
463            .iter()
464            .filter_map(|(location_key, location_name)| {
465                self.processes
466                    .get(location_key)
467                    .map(|process| (LocationId::Process(location_key), &**location_name, process))
468            })
469    }
470
471    pub fn get_all_clusters(&self) -> impl Iterator<Item = (LocationId, &str, &D::Cluster)> {
472        self.location_names
473            .iter()
474            .filter_map(|(location_key, location_name)| {
475                self.clusters
476                    .get(location_key)
477                    .map(|cluster| (LocationId::Cluster(location_key), &**location_name, cluster))
478            })
479    }
480
481    #[deprecated(note = "use `connect` instead")]
482    pub async fn connect_bytes<M>(
483        &self,
484        port: ExternalBytesPort<M>,
485    ) -> (
486        Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
487        Pin<Box<dyn Sink<Bytes, Error = Error>>>,
488    ) {
489        self.connect(port).await
490    }
491
492    #[deprecated(note = "use `connect` instead")]
493    pub async fn connect_sink_bytes<M>(
494        &self,
495        port: ExternalBytesPort<M>,
496    ) -> Pin<Box<dyn Sink<Bytes, Error = Error>>> {
497        self.connect(port).await.1
498    }
499
500    pub async fn connect_bincode<
501        InT: Serialize + 'static,
502        OutT: DeserializeOwned + 'static,
503        Many,
504    >(
505        &self,
506        port: ExternalBincodeBidi<InT, OutT, Many>,
507    ) -> (
508        Pin<Box<dyn Stream<Item = OutT>>>,
509        Pin<Box<dyn Sink<InT, Error = Error>>>,
510    ) {
511        self.externals
512            .get(port.process_key)
513            .unwrap()
514            .as_bincode_bidi(port.port_id)
515            .await
516    }
517
518    #[deprecated(note = "use `connect` instead")]
519    pub async fn connect_sink_bincode<T: Serialize + DeserializeOwned + 'static, Many>(
520        &self,
521        port: ExternalBincodeSink<T, Many>,
522    ) -> Pin<Box<dyn Sink<T, Error = Error>>> {
523        self.connect(port).await
524    }
525
526    #[deprecated(note = "use `connect` instead")]
527    pub async fn connect_source_bytes(
528        &self,
529        port: ExternalBytesPort,
530    ) -> Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>> {
531        self.connect(port).await.0
532    }
533
534    #[deprecated(note = "use `connect` instead")]
535    pub async fn connect_source_bincode<
536        T: Serialize + DeserializeOwned + 'static,
537        O: Ordering,
538        R: Retries,
539    >(
540        &self,
541        port: ExternalBincodeStream<T, O, R>,
542    ) -> Pin<Box<dyn Stream<Item = T>>> {
543        self.connect(port).await
544    }
545
546    pub async fn connect<'b, P: ConnectableAsync<&'b Self>>(
547        &'b self,
548        port: P,
549    ) -> <P as ConnectableAsync<&'b Self>>::Output {
550        port.connect(self).await
551    }
552}
553
554#[cfg(stageleft_runtime)]
555#[cfg(feature = "deploy")]
556#[cfg_attr(docsrs, doc(cfg(feature = "deploy")))]
557impl DeployResult<'_, crate::deploy::HydroDeploy> {
558    /// Get the raw port handle.
559    pub fn raw_port<M>(
560        &self,
561        port: ExternalBytesPort<M>,
562    ) -> hydro_deploy::custom_service::CustomClientPort {
563        self.externals
564            .get(port.process_key)
565            .unwrap()
566            .raw_port(port.port_id)
567    }
568}
569
570pub trait ConnectableAsync<Ctx> {
571    type Output;
572
573    fn connect(self, ctx: Ctx) -> impl Future<Output = Self::Output>;
574}
575
576impl<'a, D: Deploy<'a>, M> ConnectableAsync<&DeployResult<'a, D>> for ExternalBytesPort<M> {
577    type Output = (
578        Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
579        Pin<Box<dyn Sink<Bytes, Error = Error>>>,
580    );
581
582    async fn connect(self, ctx: &DeployResult<'a, D>) -> Self::Output {
583        ctx.externals
584            .get(self.process_key)
585            .unwrap()
586            .as_bytes_bidi(self.port_id)
587            .await
588    }
589}
590
591impl<'a, D: Deploy<'a>, T: DeserializeOwned + 'static, O: Ordering, R: Retries>
592    ConnectableAsync<&DeployResult<'a, D>> for ExternalBincodeStream<T, O, R>
593{
594    type Output = Pin<Box<dyn Stream<Item = T>>>;
595
596    async fn connect(self, ctx: &DeployResult<'a, D>) -> Self::Output {
597        ctx.externals
598            .get(self.process_key)
599            .unwrap()
600            .as_bincode_source(self.port_id)
601            .await
602    }
603}
604
605impl<'a, D: Deploy<'a>, T: Serialize + 'static, Many> ConnectableAsync<&DeployResult<'a, D>>
606    for ExternalBincodeSink<T, Many>
607{
608    type Output = Pin<Box<dyn Sink<T, Error = Error>>>;
609
610    async fn connect(self, ctx: &DeployResult<'a, D>) -> Self::Output {
611        ctx.externals
612            .get(self.process_key)
613            .unwrap()
614            .as_bincode_sink(self.port_id)
615            .await
616    }
617}