Skip to main content

hydro_lang/deploy/
deploy_graph_containerized.rs

1//! Deployment backend for Hydro that uses Docker to provision and launch services.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::pin::Pin;
6use std::rc::Rc;
7
8use bollard::Docker;
9use bollard::models::{ContainerCreateBody, EndpointSettings, HostConfig, NetworkCreateRequest};
10use bollard::query_parameters::{
11    BuildImageOptions, CreateContainerOptions, InspectContainerOptions, KillContainerOptions,
12    RemoveContainerOptions, StartContainerOptions,
13};
14use bollard::secret::NetworkingConfig;
15use bytes::Bytes;
16use dfir_lang::graph::DfirGraph;
17use futures::{Sink, SinkExt, Stream, StreamExt};
18use http_body_util::Full;
19use hydro_deploy::rust_crate::build::{BuildError, build_crate_memoized};
20use hydro_deploy::{LinuxCompileType, RustCrate};
21use nanoid::nanoid;
22use proc_macro2::Span;
23use sinktools::lazy::LazySink;
24use stageleft::QuotedWithContext;
25use syn::parse_quote;
26use tar::{Builder, Header};
27use tokio::net::TcpStream;
28use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
29use tracing::{Instrument, instrument, trace, warn};
30
31use super::deploy_runtime_containerized::*;
32use crate::compile::builder::ExternalPortId;
33use crate::compile::deploy::DeployResult;
34use crate::compile::deploy_provider::{
35    ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort,
36};
37use crate::compile::trybuild::generate::{LinkingMode, create_graph_trybuild};
38use crate::location::dynamic::LocationId;
39use crate::location::member_id::TaglessMemberId;
40use crate::location::{LocationKey, MembershipEvent, NetworkHint};
41
42/// represents a docker network
43#[derive(Clone, Debug)]
44pub struct DockerNetwork {
45    name: String,
46}
47
48impl DockerNetwork {
49    /// creates a new docker network (will actually be created when deployment.start() is called).
50    pub fn new(name: String) -> Self {
51        Self {
52            name: format!("{name}-{}", nanoid::nanoid!(6, &CONTAINER_ALPHABET)),
53        }
54    }
55}
56
57/// Represents a process running in a docker container
58#[derive(Clone)]
59pub struct DockerDeployProcess {
60    key: LocationKey,
61    name: String,
62    next_port: Rc<RefCell<u16>>,
63    rust_crate: Rc<RefCell<Option<RustCrate>>>,
64
65    exposed_ports: Rc<RefCell<Vec<u16>>>,
66
67    docker_container_name: Rc<RefCell<Option<String>>>,
68
69    compilation_options: Option<String>,
70
71    config: Vec<String>,
72
73    network: DockerNetwork,
74}
75
76impl Node for DockerDeployProcess {
77    type Port = u16;
78    type Meta = ();
79    type InstantiateEnv = DockerDeploy;
80
81    #[instrument(level = "trace", skip_all, ret, fields(key = %self.key, name = self.name))]
82    fn next_port(&self) -> Self::Port {
83        let port = {
84            let mut borrow = self.next_port.borrow_mut();
85            let port = *borrow;
86            *borrow += 1;
87            port
88        };
89
90        port
91    }
92
93    #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name))]
94    fn update_meta(&self, _meta: &Self::Meta) {}
95
96    #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name, ?meta, extra_stmts = extra_stmts.len(), sidecars = sidecars.len()))]
97    fn instantiate(
98        &self,
99        _env: &mut Self::InstantiateEnv,
100        meta: &mut Self::Meta,
101        graph: DfirGraph,
102        extra_stmts: &[syn::Stmt],
103        sidecars: &[syn::Expr],
104    ) {
105        let (bin_name, config) = create_graph_trybuild(
106            graph,
107            extra_stmts,
108            sidecars,
109            Some(&self.name),
110            crate::compile::trybuild::generate::DeployMode::Containerized,
111            LinkingMode::Static,
112        );
113
114        let mut ret = RustCrate::new(&config.project_dir, &config.project_dir)
115            .target_dir(config.target_dir)
116            .example(bin_name)
117            .no_default_features();
118
119        ret = ret.display_name("test_display_name");
120
121        ret = ret.features(vec!["hydro___feature_docker_runtime".to_owned()]);
122
123        if let Some(features) = config.features {
124            ret = ret.features(features);
125        }
126
127        ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
128        ret = ret.config("build.incremental = false");
129
130        *self.rust_crate.borrow_mut() = Some(ret);
131    }
132}
133
134/// Represents a logical cluster, which can be a variable amount of individual containers.
135#[derive(Clone)]
136pub struct DockerDeployCluster {
137    key: LocationKey,
138    name: String,
139    next_port: Rc<RefCell<u16>>,
140    rust_crate: Rc<RefCell<Option<RustCrate>>>,
141
142    docker_container_name: Rc<RefCell<Vec<String>>>,
143
144    compilation_options: Option<String>,
145
146    config: Vec<String>,
147
148    count: usize,
149}
150
151impl Node for DockerDeployCluster {
152    type Port = u16;
153    type Meta = ();
154    type InstantiateEnv = DockerDeploy;
155
156    #[instrument(level = "trace", skip_all, ret, fields(key = %self.key, name = self.name))]
157    fn next_port(&self) -> Self::Port {
158        let port = {
159            let mut borrow = self.next_port.borrow_mut();
160            let port = *borrow;
161            *borrow += 1;
162            port
163        };
164
165        port
166    }
167
168    #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name))]
169    fn update_meta(&self, _meta: &Self::Meta) {}
170
171    #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name, extra_stmts = extra_stmts.len()))]
172    fn instantiate(
173        &self,
174        _env: &mut Self::InstantiateEnv,
175        _meta: &mut Self::Meta,
176        graph: DfirGraph,
177        extra_stmts: &[syn::Stmt],
178        sidecars: &[syn::Expr],
179    ) {
180        let (bin_name, config) = create_graph_trybuild(
181            graph,
182            extra_stmts,
183            sidecars,
184            Some(&self.name),
185            crate::compile::trybuild::generate::DeployMode::Containerized,
186            LinkingMode::Static,
187        );
188
189        let mut ret = RustCrate::new(&config.project_dir, &config.project_dir)
190            .target_dir(config.target_dir)
191            .example(bin_name)
192            .no_default_features();
193
194        ret = ret.display_name("test_display_name");
195
196        ret = ret.features(vec!["hydro___feature_docker_runtime".to_owned()]);
197
198        if let Some(features) = config.features {
199            ret = ret.features(features);
200        }
201
202        ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
203        ret = ret.config("build.incremental = false");
204
205        *self.rust_crate.borrow_mut() = Some(ret);
206    }
207}
208
209/// Represents an external process, outside the control of this deployment but still with some communication into this deployment.
210#[derive(Clone, Debug)]
211pub struct DockerDeployExternal {
212    name: String,
213    next_port: Rc<RefCell<u16>>,
214
215    ports: Rc<RefCell<HashMap<ExternalPortId, u16>>>,
216
217    #[expect(clippy::type_complexity, reason = "internal code")]
218    connection_info: Rc<RefCell<HashMap<u16, (Rc<RefCell<Option<String>>>, u16, DockerNetwork)>>>,
219}
220
221impl Node for DockerDeployExternal {
222    type Port = u16;
223    type Meta = ();
224    type InstantiateEnv = DockerDeploy;
225
226    #[instrument(level = "trace", skip_all, ret, fields(name = self.name))]
227    fn next_port(&self) -> Self::Port {
228        let port = {
229            let mut borrow = self.next_port.borrow_mut();
230            let port = *borrow;
231            *borrow += 1;
232            port
233        };
234
235        port
236    }
237
238    #[instrument(level = "trace", skip_all, fields(name = self.name))]
239    fn update_meta(&self, _meta: &Self::Meta) {}
240
241    #[instrument(level = "trace", skip_all, fields(name = self.name, ?meta, extra_stmts = extra_stmts.len(), sidecars = sidecars.len()))]
242    fn instantiate(
243        &self,
244        _env: &mut Self::InstantiateEnv,
245        meta: &mut Self::Meta,
246        graph: DfirGraph,
247        extra_stmts: &[syn::Stmt],
248        sidecars: &[syn::Expr],
249    ) {
250        trace!(name: "surface", surface = graph.surface_syntax_string());
251    }
252}
253
254type DynSourceSink<Out, In, InErr> = (
255    Pin<Box<dyn Stream<Item = Out>>>,
256    Pin<Box<dyn Sink<In, Error = InErr>>>,
257);
258
259impl<'a> RegisterPort<'a, DockerDeploy> for DockerDeployExternal {
260    #[instrument(level = "trace", skip_all, fields(name = self.name, %external_port_id, %port))]
261    fn register(&self, external_port_id: ExternalPortId, port: Self::Port) {
262        self.ports.borrow_mut().insert(external_port_id, port);
263    }
264
265    fn as_bytes_bidi(
266        &self,
267        external_port_id: ExternalPortId,
268    ) -> impl Future<
269        Output = DynSourceSink<Result<bytes::BytesMut, std::io::Error>, Bytes, std::io::Error>,
270    > + 'a {
271        let guard =
272            tracing::trace_span!("as_bytes_bidi", name = %self.name, %external_port_id).entered();
273
274        let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
275        let (docker_container_name, remote_port, _) = self
276            .connection_info
277            .borrow()
278            .get(&local_port)
279            .unwrap()
280            .clone();
281
282        let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
283
284        async move {
285            let local_port =
286                find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
287            let remote_ip_address = "localhost";
288
289            trace!(name: "as_bytes_bidi_connecting", to = %remote_ip_address, to_port = %local_port);
290
291            let stream = TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
292                .await
293                .unwrap();
294
295            trace!(name: "as_bytes_bidi_connected", to = %remote_ip_address, to_port = %local_port);
296
297            let (rx, tx) = stream.into_split();
298
299            let source = Box::pin(
300                FramedRead::new(rx, LengthDelimitedCodec::new()),
301            ) as Pin<Box<dyn Stream<Item = Result<bytes::BytesMut, std::io::Error>>>>;
302
303            let sink = Box::pin(FramedWrite::new(tx, LengthDelimitedCodec::new()))
304                as Pin<Box<dyn Sink<Bytes, Error = std::io::Error>>>;
305
306            (source, sink)
307        }
308        .instrument(guard.exit())
309    }
310
311    fn as_bincode_bidi<InT, OutT>(
312        &self,
313        external_port_id: ExternalPortId,
314    ) -> impl Future<Output = DynSourceSink<OutT, InT, std::io::Error>> + 'a
315    where
316        InT: serde::Serialize + 'static,
317        OutT: serde::de::DeserializeOwned + 'static,
318    {
319        let guard =
320            tracing::trace_span!("as_bincode_bidi", name = %self.name, %external_port_id).entered();
321
322        let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
323        let (docker_container_name, remote_port, _) = self
324            .connection_info
325            .borrow()
326            .get(&local_port)
327            .unwrap()
328            .clone();
329
330        let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
331
332        async move {
333            let local_port =
334                find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
335            let remote_ip_address = "localhost";
336
337            trace!(name: "as_bincode_bidi_connecting", to = %remote_ip_address, to_port = %local_port);
338
339            let stream = TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
340                .await
341                .unwrap();
342
343            trace!(name: "as_bincode_bidi_connected", to = %remote_ip_address, to_port = %local_port);
344
345            let (rx, tx) = stream.into_split();
346
347            let source = Box::pin(
348                FramedRead::new(rx, LengthDelimitedCodec::new())
349                    .map(|v| bincode::deserialize(&v.unwrap()).unwrap()),
350            ) as Pin<Box<dyn Stream<Item = OutT>>>;
351
352            let sink = Box::pin(
353                FramedWrite::new(tx, LengthDelimitedCodec::new()).with(move |v: InT| async move {
354                    Ok::<_, std::io::Error>(Bytes::from(bincode::serialize(&v).unwrap()))
355                }),
356            ) as Pin<Box<dyn Sink<InT, Error = std::io::Error>>>;
357
358            (source, sink)
359        }
360        .instrument(guard.exit())
361    }
362
363    fn as_bincode_sink<T>(
364        &self,
365        external_port_id: ExternalPortId,
366    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = std::io::Error>>>> + 'a
367    where
368        T: serde::Serialize + 'static,
369    {
370        let guard =
371            tracing::trace_span!("as_bincode_sink", name = %self.name, %external_port_id).entered();
372
373        let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
374        let (docker_container_name, remote_port, _) = self
375            .connection_info
376            .borrow()
377            .get(&local_port)
378            .unwrap()
379            .clone();
380
381        let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
382
383        async move {
384            let local_port = find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
385            let remote_ip_address = "localhost";
386
387            Box::pin(
388                LazySink::new(move || {
389                    Box::pin(async move {
390                        trace!(name: "as_bincode_sink_connecting", to = %remote_ip_address, to_port = %local_port);
391
392                        let stream =
393                            TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
394                                .await?;
395
396                        trace!(name: "as_bincode_sink_connected", to = %remote_ip_address, to_port = %local_port);
397
398                        Result::<_, std::io::Error>::Ok(FramedWrite::new(
399                            stream,
400                            LengthDelimitedCodec::new(),
401                        ))
402                    })
403                })
404                .with(move |v| async move {
405                    Ok(Bytes::from(bincode::serialize(&v).unwrap()))
406                }),
407            ) as Pin<Box<dyn Sink<T, Error = std::io::Error>>>
408        }
409        .instrument(guard.exit())
410    }
411
412    fn as_bincode_source<T>(
413        &self,
414        external_port_id: ExternalPortId,
415    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
416    where
417        T: serde::de::DeserializeOwned + 'static,
418    {
419        let guard =
420            tracing::trace_span!("as_bincode_sink", name = %self.name, %external_port_id).entered();
421
422        let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
423        let (docker_container_name, remote_port, _) = self
424            .connection_info
425            .borrow()
426            .get(&local_port)
427            .unwrap()
428            .clone();
429
430        let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
431
432        async move {
433
434            let local_port = find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
435            let remote_ip_address = "localhost";
436
437            trace!(name: "as_bincode_source_connecting", to = %remote_ip_address, to_port = %local_port);
438
439            let stream = TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
440                .await
441                .unwrap();
442
443            trace!(name: "as_bincode_source_connected", to = %remote_ip_address, to_port = %local_port);
444
445            Box::pin(
446                FramedRead::new(stream, LengthDelimitedCodec::new())
447                    .map(|v| bincode::deserialize(&v.unwrap()).unwrap()),
448            ) as Pin<Box<dyn Stream<Item = T>>>
449        }
450        .instrument(guard.exit())
451    }
452}
453
454#[instrument(level = "trace", skip_all, fields(%docker_container_name, %destination_port))]
455async fn find_dynamically_allocated_docker_port(
456    docker_container_name: &str,
457    destination_port: u16,
458) -> u16 {
459    let docker = Docker::connect_with_local_defaults().unwrap();
460
461    let container_info = docker
462        .inspect_container(docker_container_name, None::<InspectContainerOptions>)
463        .await
464        .unwrap();
465
466    trace!(name: "port struct", container_info = ?container_info.network_settings.as_ref().unwrap().ports.as_ref().unwrap());
467
468    // container_info={"1001/tcp": Some([PortBinding { host_ip: Some("0.0.0.0"), host_port: Some("32771") }, PortBinding { host_ip: Some("::"), host_port: Some("32771") }])} destination_port=1001
469    let remote_port = container_info
470        .network_settings
471        .as_ref()
472        .unwrap()
473        .ports
474        .as_ref()
475        .unwrap()
476        .get(&format!("{destination_port}/tcp"))
477        .unwrap()
478        .as_ref()
479        .unwrap()
480        .iter()
481        .find(|v| v.host_ip == Some("0.0.0.0".to_owned()))
482        .unwrap()
483        .host_port
484        .as_ref()
485        .unwrap()
486        .parse()
487        .unwrap();
488
489    remote_port
490}
491
492/// For deploying to a local docker instance
493pub struct DockerDeploy {
494    docker_processes: Vec<DockerDeployProcessSpec>,
495    docker_clusters: Vec<DockerDeployClusterSpec>,
496    network: DockerNetwork,
497    deployment_instance: String,
498}
499
500#[instrument(level = "trace", skip_all, fields(%image_name, %container_name, %network_name, %deployment_instance))]
501async fn create_and_start_container(
502    docker: &Docker,
503    container_name: &str,
504    image_name: &str,
505    network_name: &str,
506    deployment_instance: &str,
507) -> Result<(), anyhow::Error> {
508    let config = ContainerCreateBody {
509        image: Some(image_name.to_owned()),
510        hostname: Some(container_name.to_owned()),
511        host_config: Some(HostConfig {
512            binds: Some(vec!["/var/run/docker.sock:/var/run/docker.sock".to_owned()]),
513            publish_all_ports: Some(true),
514            port_bindings: Some(HashMap::new()), /* Due to a bug in docker, if you don't send empty port bindings with publish_all_ports set to true and with a docker image that has EXPOSE directives in it, docker will crash because it will try to write to a map in memory that it has not initialized yet. Setting port_bindings explicitly to an empty map will initialize it first so that it does not break. */
515            ..Default::default()
516        }),
517        env: Some(vec![
518            format!("CONTAINER_NAME={container_name}"),
519            format!("DEPLOYMENT_INSTANCE={deployment_instance}"),
520            format!("RUST_LOG=trace"),
521        ]),
522        networking_config: Some(NetworkingConfig {
523            endpoints_config: Some(HashMap::from([(
524                network_name.to_owned(),
525                EndpointSettings {
526                    ..Default::default()
527                },
528            )])),
529        }),
530        tty: Some(true),
531        ..Default::default()
532    };
533
534    let options = CreateContainerOptions {
535        name: Some(container_name.to_owned()),
536        ..Default::default()
537    };
538
539    tracing::error!("Config: {}", serde_json::to_string_pretty(&config).unwrap());
540    docker.create_container(Some(options), config).await?;
541    docker
542        .start_container(container_name, None::<StartContainerOptions>)
543        .await?;
544
545    Ok(())
546}
547
548#[instrument(level = "trace", skip_all, fields(%image_name))]
549async fn build_and_create_image(
550    rust_crate: &Rc<RefCell<Option<RustCrate>>>,
551    compilation_options: Option<&str>,
552    config: &[String],
553    exposed_ports: &[u16],
554    image_name: &str,
555) -> Result<(), anyhow::Error> {
556    let mut rust_crate = rust_crate
557        .borrow_mut()
558        .take()
559        .unwrap()
560        .rustflags(compilation_options.unwrap_or_default());
561
562    for cfg in config {
563        rust_crate = rust_crate.config(cfg);
564    }
565
566    let build_output = match build_crate_memoized(
567        rust_crate.get_build_params(hydro_deploy::HostTargetType::Linux(LinuxCompileType::Musl)),
568    )
569    .await
570    {
571        Ok(build_output) => build_output,
572        Err(BuildError::FailedToBuildCrate {
573            exit_status,
574            diagnostics,
575            text_lines,
576            stderr_lines,
577        }) => {
578            let diagnostics = diagnostics
579                .into_iter()
580                .map(|d| d.rendered.unwrap())
581                .collect::<Vec<_>>()
582                .join("\n");
583            let text_lines = text_lines.join("\n");
584            let stderr_lines = stderr_lines.join("\n");
585
586            anyhow::bail!(
587                r#"
588Failed to build crate {exit_status:?}
589--- diagnostics
590---
591{diagnostics}
592---
593---
594---
595
596--- text_lines
597---
598---
599{text_lines}
600---
601---
602---
603
604--- stderr_lines
605---
606---
607{stderr_lines}
608---
609---
610---"#
611            );
612        }
613        Err(err) => {
614            anyhow::bail!("Failed to build crate {err:?}");
615        }
616    };
617
618    let docker = Docker::connect_with_local_defaults()?;
619
620    let mut tar_data = Vec::new();
621    {
622        let mut tar = Builder::new(&mut tar_data);
623
624        let exposed_ports = exposed_ports
625            .iter()
626            .map(|port| format!("EXPOSE {port}/tcp"))
627            .collect::<Vec<_>>()
628            .join("\n");
629
630        let dockerfile_content = format!(
631            r#"
632                FROM scratch
633                {exposed_ports}
634                COPY app /app
635                CMD ["/app"]
636            "#,
637        );
638
639        trace!(name: "dockerfile", %dockerfile_content);
640
641        let mut header = Header::new_gnu();
642        header.set_path("Dockerfile")?;
643        header.set_size(dockerfile_content.len() as u64);
644        header.set_cksum();
645        tar.append(&header, dockerfile_content.as_bytes())?;
646
647        let mut header = Header::new_gnu();
648        header.set_path("app")?;
649        header.set_size(build_output.bin_data.len() as u64);
650        header.set_mode(0o755);
651        header.set_cksum();
652        tar.append(&header, &build_output.bin_data[..])?;
653
654        tar.finish()?;
655    }
656
657    let build_options = BuildImageOptions {
658        dockerfile: "Dockerfile".to_owned(),
659        t: Some(image_name.to_owned()),
660        rm: true,
661        ..Default::default()
662    };
663
664    use bollard::errors::Error;
665
666    let body = http_body_util::Either::Left(Full::new(Bytes::from(tar_data)));
667    let mut build_stream = docker.build_image(build_options, None, Some(body));
668    while let Some(msg) = build_stream.next().await {
669        match msg {
670            Ok(_) => {}
671            Err(e) => match e {
672                Error::DockerStreamError { error } => {
673                    return Err(anyhow::anyhow!(
674                        "Docker build failed: DockerStreamError: {{ error: {error} }}"
675                    ));
676                }
677                _ => return Err(anyhow::anyhow!("Docker build failed: {}", e)),
678            },
679        }
680    }
681
682    Ok(())
683}
684
685impl DockerDeploy {
686    /// Create a new deployment
687    pub fn new(network: DockerNetwork) -> Self {
688        Self {
689            docker_processes: Vec::new(),
690            docker_clusters: Vec::new(),
691            network,
692            deployment_instance: nanoid!(6, &CONTAINER_ALPHABET),
693        }
694    }
695
696    /// Add an internal docker service to the deployment.
697    pub fn add_localhost_docker(
698        &mut self,
699        compilation_options: Option<String>,
700        config: Vec<String>,
701    ) -> DockerDeployProcessSpec {
702        let process = DockerDeployProcessSpec {
703            compilation_options,
704            config,
705            network: self.network.clone(),
706            deployment_instance: self.deployment_instance.clone(),
707        };
708
709        self.docker_processes.push(process.clone());
710
711        process
712    }
713
714    /// Add an internal docker cluster to the deployment.
715    pub fn add_localhost_docker_cluster(
716        &mut self,
717        compilation_options: Option<String>,
718        config: Vec<String>,
719        count: usize,
720    ) -> DockerDeployClusterSpec {
721        let cluster = DockerDeployClusterSpec {
722            compilation_options,
723            config,
724            count,
725            deployment_instance: self.deployment_instance.clone(),
726        };
727
728        self.docker_clusters.push(cluster.clone());
729
730        cluster
731    }
732
733    /// Add an external process to the deployment.
734    pub fn add_external(&self, name: String) -> DockerDeployExternalSpec {
735        DockerDeployExternalSpec { name }
736    }
737
738    /// Get the deployment instance from this deployment.
739    pub fn get_deployment_instance(&self) -> String {
740        self.deployment_instance.clone()
741    }
742
743    /// Create docker images.
744    #[instrument(level = "trace", skip_all)]
745    pub async fn provision(&self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
746        for (_, _, process) in nodes.get_all_processes() {
747            let exposed_ports = process.exposed_ports.borrow().clone();
748
749            build_and_create_image(
750                &process.rust_crate,
751                process.compilation_options.as_deref(),
752                &process.config,
753                &exposed_ports,
754                &process.name,
755            )
756            .await?;
757        }
758
759        for (_, _, cluster) in nodes.get_all_clusters() {
760            build_and_create_image(
761                &cluster.rust_crate,
762                cluster.compilation_options.as_deref(),
763                &cluster.config,
764                &[], // clusters don't have exposed ports.
765                &cluster.name,
766            )
767            .await?;
768        }
769
770        Ok(())
771    }
772
773    /// Start the deployment, tell docker to create containers from the existing provisioned images.
774    #[instrument(level = "trace", skip_all)]
775    pub async fn start(&self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
776        let docker = Docker::connect_with_local_defaults()?;
777
778        match docker
779            .create_network(NetworkCreateRequest {
780                name: self.network.name.clone(),
781                driver: Some("bridge".to_owned()),
782                ..Default::default()
783            })
784            .await
785        {
786            Ok(v) => v.id,
787            Err(e) => {
788                panic!("Failed to create docker network: {e:?}");
789            }
790        };
791
792        for (_, _, process) in nodes.get_all_processes() {
793            let docker_container_name: String = get_docker_container_name(&process.name, None);
794            *process.docker_container_name.borrow_mut() = Some(docker_container_name.clone());
795
796            create_and_start_container(
797                &docker,
798                &docker_container_name,
799                &process.name,
800                &self.network.name,
801                &self.deployment_instance,
802            )
803            .await?;
804        }
805
806        for (_, _, cluster) in nodes.get_all_clusters() {
807            for num in 0..cluster.count {
808                let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
809                cluster
810                    .docker_container_name
811                    .borrow_mut()
812                    .push(docker_container_name.clone());
813
814                create_and_start_container(
815                    &docker,
816                    &docker_container_name,
817                    &cluster.name,
818                    &self.network.name,
819                    &self.deployment_instance,
820                )
821                .await?;
822            }
823        }
824
825        Ok(())
826    }
827
828    /// Stop the deployment, destroy all containers
829    #[instrument(level = "trace", skip_all)]
830    pub async fn stop(&mut self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
831        let docker = Docker::connect_with_local_defaults()?;
832
833        for (_, _, process) in nodes.get_all_processes() {
834            let docker_container_name: String = get_docker_container_name(&process.name, None);
835
836            docker
837                .kill_container(&docker_container_name, None::<KillContainerOptions>)
838                .await?;
839        }
840
841        for (_, _, cluster) in nodes.get_all_clusters() {
842            for num in 0..cluster.count {
843                let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
844
845                docker
846                    .kill_container(&docker_container_name, None::<KillContainerOptions>)
847                    .await?;
848            }
849        }
850
851        Ok(())
852    }
853
854    /// remove containers, images, and networks.
855    #[instrument(level = "trace", skip_all)]
856    pub async fn cleanup(&mut self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
857        let docker = Docker::connect_with_local_defaults()?;
858
859        for (_, _, process) in nodes.get_all_processes() {
860            let docker_container_name: String = get_docker_container_name(&process.name, None);
861
862            docker
863                .remove_container(&docker_container_name, None::<RemoveContainerOptions>)
864                .await?;
865        }
866
867        for (_, _, cluster) in nodes.get_all_clusters() {
868            for num in 0..cluster.count {
869                let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
870
871                docker
872                    .remove_container(&docker_container_name, None::<RemoveContainerOptions>)
873                    .await?;
874            }
875        }
876
877        docker
878            .remove_network(&self.network.name)
879            .await
880            .map_err(|e| anyhow::anyhow!("Failed to remove docker network: {e:?}"))?;
881
882        use bollard::query_parameters::RemoveImageOptions;
883
884        for (_, _, process) in nodes.get_all_processes() {
885            docker
886                .remove_image(&process.name, None::<RemoveImageOptions>, None)
887                .await?;
888        }
889
890        for (_, _, cluster) in nodes.get_all_clusters() {
891            docker
892                .remove_image(&cluster.name, None::<RemoveImageOptions>, None)
893                .await?;
894        }
895
896        Ok(())
897    }
898}
899
900impl<'a> Deploy<'a> for DockerDeploy {
901    type Meta = ();
902    type InstantiateEnv = Self;
903
904    type Process = DockerDeployProcess;
905    type Cluster = DockerDeployCluster;
906    type External = DockerDeployExternal;
907
908    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, p2_port))]
909    fn o2o_sink_source(
910        _env: &mut Self::InstantiateEnv,
911        p1: &Self::Process,
912        p1_port: &<Self::Process as Node>::Port,
913        p2: &Self::Process,
914        p2_port: &<Self::Process as Node>::Port,
915        _name: Option<&str>,
916    ) -> (syn::Expr, syn::Expr) {
917        let bind_addr = format!("0.0.0.0:{}", p2_port);
918        let target = format!("{}:{p2_port}", p2.name);
919
920        deploy_containerized_o2o(target.as_str(), bind_addr.as_str())
921    }
922
923    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, p2_port))]
924    fn o2o_connect(
925        p1: &Self::Process,
926        p1_port: &<Self::Process as Node>::Port,
927        p2: &Self::Process,
928        p2_port: &<Self::Process as Node>::Port,
929    ) -> Box<dyn FnOnce()> {
930        let serialized = format!("o2o_connect {}:{p1_port} -> {}:{p2_port}", p1.name, p2.name);
931
932        Box::new(move || {
933            trace!(name: "o2o_connect thunk", %serialized);
934        })
935    }
936
937    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
938    fn o2m_sink_source(
939        _env: &mut Self::InstantiateEnv,
940        p1: &Self::Process,
941        p1_port: &<Self::Process as Node>::Port,
942        c2: &Self::Cluster,
943        c2_port: &<Self::Cluster as Node>::Port,
944        _name: Option<&str>,
945    ) -> (syn::Expr, syn::Expr) {
946        deploy_containerized_o2m(*c2_port)
947    }
948
949    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
950    fn o2m_connect(
951        p1: &Self::Process,
952        p1_port: &<Self::Process as Node>::Port,
953        c2: &Self::Cluster,
954        c2_port: &<Self::Cluster as Node>::Port,
955    ) -> Box<dyn FnOnce()> {
956        let serialized = format!("o2m_connect {}:{p1_port} -> {}:{c2_port}", p1.name, c2.name);
957
958        Box::new(move || {
959            trace!(name: "o2m_connect thunk", %serialized);
960        })
961    }
962
963    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
964    fn m2o_sink_source(
965        _env: &mut Self::InstantiateEnv,
966        c1: &Self::Cluster,
967        c1_port: &<Self::Cluster as Node>::Port,
968        p2: &Self::Process,
969        p2_port: &<Self::Process as Node>::Port,
970        _name: Option<&str>,
971    ) -> (syn::Expr, syn::Expr) {
972        deploy_containerized_m2o(*p2_port, &p2.name)
973    }
974
975    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
976    fn m2o_connect(
977        c1: &Self::Cluster,
978        c1_port: &<Self::Cluster as Node>::Port,
979        p2: &Self::Process,
980        p2_port: &<Self::Process as Node>::Port,
981    ) -> Box<dyn FnOnce()> {
982        let serialized = format!("o2m_connect {}:{c1_port} -> {}:{p2_port}", c1.name, p2.name);
983
984        Box::new(move || {
985            trace!(name: "m2o_connect thunk", %serialized);
986        })
987    }
988
989    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
990    fn m2m_sink_source(
991        _env: &mut Self::InstantiateEnv,
992        c1: &Self::Cluster,
993        c1_port: &<Self::Cluster as Node>::Port,
994        c2: &Self::Cluster,
995        c2_port: &<Self::Cluster as Node>::Port,
996        _name: Option<&str>,
997    ) -> (syn::Expr, syn::Expr) {
998        deploy_containerized_m2m(*c2_port)
999    }
1000
1001    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
1002    fn m2m_connect(
1003        c1: &Self::Cluster,
1004        c1_port: &<Self::Cluster as Node>::Port,
1005        c2: &Self::Cluster,
1006        c2_port: &<Self::Cluster as Node>::Port,
1007    ) -> Box<dyn FnOnce()> {
1008        let serialized = format!("m2m_connect {}:{c1_port} -> {}:{c2_port}", c1.name, c2.name);
1009
1010        Box::new(move || {
1011            trace!(name: "m2m_connect thunk", %serialized);
1012        })
1013    }
1014
1015    #[instrument(level = "trace", skip_all, fields(p2 = p2.name, %p2_port, %shared_handle, extra_stmts = extra_stmts.len()))]
1016    fn e2o_many_source(
1017        extra_stmts: &mut Vec<syn::Stmt>,
1018        p2: &Self::Process,
1019        p2_port: &<Self::Process as Node>::Port,
1020        codec_type: &syn::Type,
1021        shared_handle: String,
1022    ) -> syn::Expr {
1023        p2.exposed_ports.borrow_mut().push(*p2_port);
1024
1025        let socket_ident = syn::Ident::new(
1026            &format!("__hydro_deploy_many_{}_socket", &shared_handle),
1027            Span::call_site(),
1028        );
1029
1030        let source_ident = syn::Ident::new(
1031            &format!("__hydro_deploy_many_{}_source", &shared_handle),
1032            Span::call_site(),
1033        );
1034
1035        let sink_ident = syn::Ident::new(
1036            &format!("__hydro_deploy_many_{}_sink", &shared_handle),
1037            Span::call_site(),
1038        );
1039
1040        let membership_ident = syn::Ident::new(
1041            &format!("__hydro_deploy_many_{}_membership", &shared_handle),
1042            Span::call_site(),
1043        );
1044
1045        let bind_addr = format!("0.0.0.0:{}", p2_port);
1046
1047        extra_stmts.push(syn::parse_quote! {
1048            let #socket_ident = tokio::net::TcpListener::bind(#bind_addr).await.unwrap();
1049        });
1050
1051        let root = crate::staging_util::get_this_crate();
1052
1053        extra_stmts.push(syn::parse_quote! {
1054            let (#source_ident, #sink_ident, #membership_ident) = #root::runtime_support::hydro_deploy_integration::multi_connection::tcp_multi_connection::<_, #codec_type>(#socket_ident);
1055        });
1056
1057        parse_quote!(#source_ident)
1058    }
1059
1060    #[instrument(level = "trace", skip_all, fields(%shared_handle))]
1061    fn e2o_many_sink(shared_handle: String) -> syn::Expr {
1062        let sink_ident = syn::Ident::new(
1063            &format!("__hydro_deploy_many_{}_sink", &shared_handle),
1064            Span::call_site(),
1065        );
1066        parse_quote!(#sink_ident)
1067    }
1068
1069    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, %shared_handle))]
1070    fn e2o_source(
1071        extra_stmts: &mut Vec<syn::Stmt>,
1072        p1: &Self::External,
1073        p1_port: &<Self::External as Node>::Port,
1074        p2: &Self::Process,
1075        p2_port: &<Self::Process as Node>::Port,
1076        _codec_type: &syn::Type,
1077        shared_handle: String,
1078    ) -> syn::Expr {
1079        p1.connection_info.borrow_mut().insert(
1080            *p1_port,
1081            (
1082                p2.docker_container_name.clone(),
1083                *p2_port,
1084                p2.network.clone(),
1085            ),
1086        );
1087
1088        p2.exposed_ports.borrow_mut().push(*p2_port);
1089
1090        let socket_ident = syn::Ident::new(
1091            &format!("__hydro_deploy_{}_socket", &shared_handle),
1092            Span::call_site(),
1093        );
1094
1095        let source_ident = syn::Ident::new(
1096            &format!("__hydro_deploy_{}_source", &shared_handle),
1097            Span::call_site(),
1098        );
1099
1100        let sink_ident = syn::Ident::new(
1101            &format!("__hydro_deploy_{}_sink", &shared_handle),
1102            Span::call_site(),
1103        );
1104
1105        let bind_addr = format!("0.0.0.0:{}", p2_port);
1106
1107        extra_stmts.push(syn::parse_quote! {
1108            let #socket_ident = tokio::net::TcpListener::bind(#bind_addr).await.unwrap();
1109        });
1110
1111        let create_expr = deploy_containerized_external_sink_source_ident(socket_ident);
1112
1113        extra_stmts.push(syn::parse_quote! {
1114            let (#sink_ident, #source_ident) = (#create_expr).split();
1115        });
1116
1117        parse_quote!(#source_ident)
1118    }
1119
1120    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, ?many, ?server_hint))]
1121    fn e2o_connect(
1122        p1: &Self::External,
1123        p1_port: &<Self::External as Node>::Port,
1124        p2: &Self::Process,
1125        p2_port: &<Self::Process as Node>::Port,
1126        many: bool,
1127        server_hint: NetworkHint,
1128    ) -> Box<dyn FnOnce()> {
1129        if server_hint != NetworkHint::Auto {
1130            panic!(
1131                "Docker deployment only supports NetworkHint::Auto, got {:?}",
1132                server_hint
1133            );
1134        }
1135
1136        // For many connections, we need to populate connection_info so as_bincode_bidi can find it
1137        if many {
1138            p1.connection_info.borrow_mut().insert(
1139                *p1_port,
1140                (
1141                    p2.docker_container_name.clone(),
1142                    *p2_port,
1143                    p2.network.clone(),
1144                ),
1145            );
1146        }
1147
1148        let serialized = format!("e2o_connect {}:{p1_port} -> {}:{p2_port}", p1.name, p2.name);
1149
1150        Box::new(move || {
1151            trace!(name: "e2o_connect thunk", %serialized);
1152        })
1153    }
1154
1155    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, %shared_handle))]
1156    fn o2e_sink(
1157        p1: &Self::Process,
1158        p1_port: &<Self::Process as Node>::Port,
1159        p2: &Self::External,
1160        p2_port: &<Self::External as Node>::Port,
1161        shared_handle: String,
1162    ) -> syn::Expr {
1163        let sink_ident = syn::Ident::new(
1164            &format!("__hydro_deploy_{}_sink", &shared_handle),
1165            Span::call_site(),
1166        );
1167        parse_quote!(#sink_ident)
1168    }
1169
1170    #[instrument(level = "trace", skip_all, fields(%of_cluster))]
1171    fn cluster_ids(
1172        of_cluster: LocationKey,
1173    ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
1174        cluster_ids()
1175    }
1176
1177    #[instrument(level = "trace", skip_all)]
1178    fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
1179        cluster_self_id()
1180    }
1181
1182    #[instrument(level = "trace", skip_all, fields(?location_id))]
1183    fn cluster_membership_stream(
1184        _env: &mut Self::InstantiateEnv,
1185        _at_location: &LocationId,
1186        location_id: &LocationId,
1187    ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
1188    {
1189        cluster_membership_stream(location_id)
1190    }
1191}
1192
1193const CONTAINER_ALPHABET: [char; 36] = [
1194    '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i',
1195    'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
1196];
1197
1198#[instrument(level = "trace", skip_all, ret, fields(%name_hint, %location_key, %deployment_instance))]
1199fn get_docker_image_name(
1200    name_hint: &str,
1201    location_key: LocationKey,
1202    deployment_instance: &str,
1203) -> String {
1204    let name_hint = name_hint
1205        .split("::")
1206        .last()
1207        .unwrap()
1208        .to_ascii_lowercase()
1209        .replace(".", "-")
1210        .replace("_", "-")
1211        .replace("::", "-");
1212
1213    let image_unique_tag = nanoid::nanoid!(6, &CONTAINER_ALPHABET);
1214
1215    format!("hy-{name_hint}-{image_unique_tag}-{deployment_instance}-{location_key}")
1216}
1217
1218#[instrument(level = "trace", skip_all, ret, fields(%image_name, ?instance))]
1219fn get_docker_container_name(image_name: &str, instance: Option<usize>) -> String {
1220    if let Some(instance) = instance {
1221        format!("{image_name}-{instance}")
1222    } else {
1223        image_name.to_owned()
1224    }
1225}
1226/// Represents a Process running in a docker container
1227#[derive(Clone)]
1228pub struct DockerDeployProcessSpec {
1229    compilation_options: Option<String>,
1230    config: Vec<String>,
1231    network: DockerNetwork,
1232    deployment_instance: String,
1233}
1234
1235impl<'a> ProcessSpec<'a, DockerDeploy> for DockerDeployProcessSpec {
1236    #[instrument(level = "trace", skip_all, fields(%key, %name_hint))]
1237    fn build(self, key: LocationKey, name_hint: &'_ str) -> <DockerDeploy as Deploy<'a>>::Process {
1238        DockerDeployProcess {
1239            key,
1240            name: get_docker_image_name(name_hint, key, &self.deployment_instance),
1241
1242            next_port: Rc::new(RefCell::new(1000)),
1243            rust_crate: Rc::new(RefCell::new(None)),
1244
1245            exposed_ports: Rc::new(RefCell::new(Vec::new())),
1246
1247            docker_container_name: Rc::new(RefCell::new(None)),
1248
1249            compilation_options: self.compilation_options,
1250            config: self.config,
1251
1252            network: self.network.clone(),
1253        }
1254    }
1255}
1256
1257/// Represents a Cluster running across `count` docker containers.
1258#[derive(Clone)]
1259pub struct DockerDeployClusterSpec {
1260    compilation_options: Option<String>,
1261    config: Vec<String>,
1262    count: usize,
1263    deployment_instance: String,
1264}
1265
1266impl<'a> ClusterSpec<'a, DockerDeploy> for DockerDeployClusterSpec {
1267    #[instrument(level = "trace", skip_all, fields(%key, %name_hint))]
1268    fn build(self, key: LocationKey, name_hint: &str) -> <DockerDeploy as Deploy<'a>>::Cluster {
1269        DockerDeployCluster {
1270            key,
1271            name: get_docker_image_name(name_hint, key, &self.deployment_instance),
1272
1273            next_port: Rc::new(RefCell::new(1000)),
1274            rust_crate: Rc::new(RefCell::new(None)),
1275
1276            docker_container_name: Rc::new(RefCell::new(Vec::new())),
1277
1278            compilation_options: self.compilation_options,
1279            config: self.config,
1280
1281            count: self.count,
1282        }
1283    }
1284}
1285
1286/// Represents an external process outside of the management of hydro deploy.
1287pub struct DockerDeployExternalSpec {
1288    name: String,
1289}
1290
1291impl<'a> ExternalSpec<'a, DockerDeploy> for DockerDeployExternalSpec {
1292    #[instrument(level = "trace", skip_all, fields(%key, %name_hint))]
1293    fn build(self, key: LocationKey, name_hint: &str) -> <DockerDeploy as Deploy<'a>>::External {
1294        DockerDeployExternal {
1295            name: self.name,
1296            next_port: Rc::new(RefCell::new(10000)),
1297            ports: Rc::new(RefCell::new(HashMap::new())),
1298            connection_info: Rc::new(RefCell::new(HashMap::new())),
1299        }
1300    }
1301}