1use std::cell::RefCell;
8use std::future::Future;
9use std::io::{BufRead, BufReader, Error};
10use std::path::PathBuf;
11use std::pin::Pin;
12use std::process::Stdio;
13use std::rc::Rc;
14
15use bytes::{Bytes, BytesMut};
16use dfir_lang::graph::DfirGraph;
17use futures::{Sink, Stream};
18use serde::Serialize;
19use serde::de::DeserializeOwned;
20use stageleft::{QuotedWithContext, RuntimeData};
21
22use super::deploy_runtime_maelstrom::*;
23use crate::compile::builder::ExternalPortId;
24use crate::compile::deploy_provider::{ClusterSpec, Deploy, Node, RegisterPort};
25use crate::compile::trybuild::generate::{LinkingMode, create_graph_trybuild};
26use crate::location::dynamic::LocationId;
27use crate::location::member_id::TaglessMemberId;
28use crate::location::{LocationKey, MembershipEvent, NetworkHint};
29
30pub enum MaelstromDeploy {}
37
38impl<'a> Deploy<'a> for MaelstromDeploy {
39 type Meta = ();
40 type InstantiateEnv = MaelstromDeployment;
41
42 type Process = MaelstromProcess;
43 type Cluster = MaelstromCluster;
44 type External = MaelstromExternal;
45
46 fn o2o_sink_source(
47 _env: &mut Self::InstantiateEnv,
48 _p1: &Self::Process,
49 _p1_port: &<Self::Process as Node>::Port,
50 _p2: &Self::Process,
51 _p2_port: &<Self::Process as Node>::Port,
52 _name: Option<&str>,
53 ) -> (syn::Expr, syn::Expr) {
54 panic!("Maelstrom deployment does not support processes, only clusters")
55 }
56
57 fn o2o_connect(
58 _p1: &Self::Process,
59 _p1_port: &<Self::Process as Node>::Port,
60 _p2: &Self::Process,
61 _p2_port: &<Self::Process as Node>::Port,
62 ) -> Box<dyn FnOnce()> {
63 panic!("Maelstrom deployment does not support processes, only clusters")
64 }
65
66 fn o2m_sink_source(
67 _env: &mut Self::InstantiateEnv,
68 _p1: &Self::Process,
69 _p1_port: &<Self::Process as Node>::Port,
70 _c2: &Self::Cluster,
71 _c2_port: &<Self::Cluster as Node>::Port,
72 _name: Option<&str>,
73 ) -> (syn::Expr, syn::Expr) {
74 panic!("Maelstrom deployment does not support processes, only clusters")
75 }
76
77 fn o2m_connect(
78 _p1: &Self::Process,
79 _p1_port: &<Self::Process as Node>::Port,
80 _c2: &Self::Cluster,
81 _c2_port: &<Self::Cluster as Node>::Port,
82 ) -> Box<dyn FnOnce()> {
83 panic!("Maelstrom deployment does not support processes, only clusters")
84 }
85
86 fn m2o_sink_source(
87 _env: &mut Self::InstantiateEnv,
88 _c1: &Self::Cluster,
89 _c1_port: &<Self::Cluster as Node>::Port,
90 _p2: &Self::Process,
91 _p2_port: &<Self::Process as Node>::Port,
92 _name: Option<&str>,
93 ) -> (syn::Expr, syn::Expr) {
94 panic!("Maelstrom deployment does not support processes, only clusters")
95 }
96
97 fn m2o_connect(
98 _c1: &Self::Cluster,
99 _c1_port: &<Self::Cluster as Node>::Port,
100 _p2: &Self::Process,
101 _p2_port: &<Self::Process as Node>::Port,
102 ) -> Box<dyn FnOnce()> {
103 panic!("Maelstrom deployment does not support processes, only clusters")
104 }
105
106 fn m2m_sink_source(
107 _env: &mut Self::InstantiateEnv,
108 _c1: &Self::Cluster,
109 _c1_port: &<Self::Cluster as Node>::Port,
110 _c2: &Self::Cluster,
111 _c2_port: &<Self::Cluster as Node>::Port,
112 _name: Option<&str>,
113 ) -> (syn::Expr, syn::Expr) {
114 deploy_maelstrom_m2m(RuntimeData::new("__hydro_lang_maelstrom_meta"))
115 }
116
117 fn m2m_connect(
118 _c1: &Self::Cluster,
119 _c1_port: &<Self::Cluster as Node>::Port,
120 _c2: &Self::Cluster,
121 _c2_port: &<Self::Cluster as Node>::Port,
122 ) -> Box<dyn FnOnce()> {
123 Box::new(|| {})
125 }
126
127 fn e2o_many_source(
128 _extra_stmts: &mut Vec<syn::Stmt>,
129 _p2: &Self::Process,
130 _p2_port: &<Self::Process as Node>::Port,
131 _codec_type: &syn::Type,
132 _shared_handle: String,
133 ) -> syn::Expr {
134 panic!("Maelstrom deployment does not support processes, only clusters")
135 }
136
137 fn e2o_many_sink(_shared_handle: String) -> syn::Expr {
138 panic!("Maelstrom deployment does not support processes, only clusters")
139 }
140
141 fn e2o_source(
142 _extra_stmts: &mut Vec<syn::Stmt>,
143 _p1: &Self::External,
144 _p1_port: &<Self::External as Node>::Port,
145 _p2: &Self::Process,
146 _p2_port: &<Self::Process as Node>::Port,
147 _codec_type: &syn::Type,
148 _shared_handle: String,
149 ) -> syn::Expr {
150 panic!("Maelstrom deployment does not support processes, only clusters")
151 }
152
153 fn e2o_connect(
154 _p1: &Self::External,
155 _p1_port: &<Self::External as Node>::Port,
156 _p2: &Self::Process,
157 _p2_port: &<Self::Process as Node>::Port,
158 _many: bool,
159 _server_hint: NetworkHint,
160 ) -> Box<dyn FnOnce()> {
161 panic!("Maelstrom deployment does not support processes, only clusters")
162 }
163
164 fn o2e_sink(
165 _p1: &Self::Process,
166 _p1_port: &<Self::Process as Node>::Port,
167 _p2: &Self::External,
168 _p2_port: &<Self::External as Node>::Port,
169 _shared_handle: String,
170 ) -> syn::Expr {
171 panic!("Maelstrom deployment does not support processes, only clusters")
172 }
173
174 fn cluster_ids(
175 _of_cluster: LocationKey,
176 ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
177 cluster_members(RuntimeData::new("__hydro_lang_maelstrom_meta"), _of_cluster)
178 }
179
180 fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
181 cluster_self_id(RuntimeData::new("__hydro_lang_maelstrom_meta"))
182 }
183
184 fn cluster_membership_stream(
185 _env: &mut Self::InstantiateEnv,
186 _at_location: &LocationId,
187 location_id: &LocationId,
188 ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
189 {
190 cluster_membership_stream(location_id)
191 }
192}
193
194#[derive(Clone)]
196pub struct MaelstromProcess {
197 _private: (),
198}
199
200impl Node for MaelstromProcess {
201 type Port = String;
202 type Meta = ();
203 type InstantiateEnv = MaelstromDeployment;
204
205 fn next_port(&self) -> Self::Port {
206 panic!("Maelstrom deployment does not support processes")
207 }
208
209 fn update_meta(&self, _meta: &Self::Meta) {}
210
211 fn instantiate(
212 &self,
213 _env: &mut Self::InstantiateEnv,
214 _meta: &mut Self::Meta,
215 _graph: DfirGraph,
216 _extra_stmts: &[syn::Stmt],
217 _sidecars: &[syn::Expr],
218 ) {
219 panic!("Maelstrom deployment does not support processes")
220 }
221}
222
223#[derive(Clone)]
225pub struct MaelstromCluster {
226 next_port: Rc<RefCell<usize>>,
227 name_hint: Option<String>,
228}
229
230impl Node for MaelstromCluster {
231 type Port = String;
232 type Meta = ();
233 type InstantiateEnv = MaelstromDeployment;
234
235 fn next_port(&self) -> Self::Port {
236 let next_port = *self.next_port.borrow();
237 *self.next_port.borrow_mut() += 1;
238 format!("port_{}", next_port)
239 }
240
241 fn update_meta(&self, _meta: &Self::Meta) {}
242
243 fn instantiate(
244 &self,
245 env: &mut Self::InstantiateEnv,
246 _meta: &mut Self::Meta,
247 graph: DfirGraph,
248 extra_stmts: &[syn::Stmt],
249 sidecars: &[syn::Expr],
250 ) {
251 let (bin_name, config) = create_graph_trybuild(
252 graph,
253 extra_stmts,
254 sidecars,
255 self.name_hint.as_deref(),
256 crate::compile::trybuild::generate::DeployMode::Maelstrom,
257 LinkingMode::Static,
258 );
259
260 env.bin_name = Some(bin_name);
261 env.project_dir = Some(config.project_dir);
262 env.target_dir = Some(config.target_dir);
263 env.features = config.features;
264 }
265}
266
267#[derive(Clone)]
269pub enum MaelstromExternal {}
270
271impl Node for MaelstromExternal {
272 type Port = String;
273 type Meta = ();
274 type InstantiateEnv = MaelstromDeployment;
275
276 fn next_port(&self) -> Self::Port {
277 unreachable!()
278 }
279
280 fn update_meta(&self, _meta: &Self::Meta) {}
281
282 fn instantiate(
283 &self,
284 _env: &mut Self::InstantiateEnv,
285 _meta: &mut Self::Meta,
286 _graph: DfirGraph,
287 _extra_stmts: &[syn::Stmt],
288 _sidecars: &[syn::Expr],
289 ) {
290 unreachable!()
291 }
292}
293
294impl<'a> RegisterPort<'a, MaelstromDeploy> for MaelstromExternal {
295 fn register(&self, _external_port_id: ExternalPortId, _port: Self::Port) {
296 unreachable!()
297 }
298
299 #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
300 fn as_bytes_bidi(
301 &self,
302 _external_port_id: ExternalPortId,
303 ) -> impl Future<
304 Output = (
305 Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
306 Pin<Box<dyn Sink<Bytes, Error = Error>>>,
307 ),
308 > + 'a {
309 async move { unreachable!() }
310 }
311
312 #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
313 fn as_bincode_bidi<InT, OutT>(
314 &self,
315 _external_port_id: ExternalPortId,
316 ) -> impl Future<
317 Output = (
318 Pin<Box<dyn Stream<Item = OutT>>>,
319 Pin<Box<dyn Sink<InT, Error = Error>>>,
320 ),
321 > + 'a
322 where
323 InT: Serialize + 'static,
324 OutT: DeserializeOwned + 'static,
325 {
326 async move { unreachable!() }
327 }
328
329 #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
330 fn as_bincode_sink<T: Serialize + 'static>(
331 &self,
332 _external_port_id: ExternalPortId,
333 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a {
334 async move { unreachable!() }
335 }
336
337 #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
338 fn as_bincode_source<T: DeserializeOwned + 'static>(
339 &self,
340 _external_port_id: ExternalPortId,
341 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a {
342 async move { unreachable!() }
343 }
344}
345
346#[derive(Clone)]
348pub struct MaelstromClusterSpec;
349
350impl<'a> ClusterSpec<'a, MaelstromDeploy> for MaelstromClusterSpec {
351 fn build(self, key: LocationKey, name_hint: &str) -> MaelstromCluster {
352 assert_eq!(
353 key,
354 LocationKey::FIRST,
355 "there should only be one location for a Maelstrom deployment"
356 );
357 MaelstromCluster {
358 next_port: Rc::new(RefCell::new(0)),
359 name_hint: Some(name_hint.to_owned()),
360 }
361 }
362}
363
364pub struct MaelstromDeployment {
369 pub node_count: usize,
371 pub maelstrom_path: PathBuf,
373 pub workload: String,
375 pub time_limit: Option<u64>,
377 pub rate: Option<u64>,
379 pub availability: Option<String>,
381 pub nemesis: Option<String>,
383 pub extra_args: Vec<String>,
385
386 pub(crate) bin_name: Option<String>,
388 pub(crate) project_dir: Option<PathBuf>,
389 pub(crate) target_dir: Option<PathBuf>,
390 pub(crate) features: Option<Vec<String>>,
391}
392
393impl MaelstromDeployment {
394 pub fn new(workload: impl Into<String>) -> Self {
396 Self {
397 node_count: 1,
398 maelstrom_path: PathBuf::from("maelstrom"),
399 workload: workload.into(),
400 time_limit: None,
401 rate: None,
402 availability: None,
403 nemesis: None,
404 extra_args: vec![],
405 bin_name: None,
406 project_dir: None,
407 target_dir: None,
408 features: None,
409 }
410 }
411
412 pub fn node_count(mut self, count: usize) -> Self {
414 self.node_count = count;
415 self
416 }
417
418 pub fn maelstrom_path(mut self, path: impl Into<PathBuf>) -> Self {
420 self.maelstrom_path = path.into();
421 self
422 }
423
424 pub fn time_limit(mut self, seconds: u64) -> Self {
426 self.time_limit = Some(seconds);
427 self
428 }
429
430 pub fn rate(mut self, rate: u64) -> Self {
432 self.rate = Some(rate);
433 self
434 }
435
436 pub fn availability(mut self, availability: impl Into<String>) -> Self {
438 self.availability = Some(availability.into());
439 self
440 }
441
442 pub fn nemesis(mut self, nemesis: impl Into<String>) -> Self {
444 self.nemesis = Some(nemesis.into());
445 self
446 }
447
448 pub fn extra_args(mut self, args: impl IntoIterator<Item = impl Into<String>>) -> Self {
450 self.extra_args.extend(args.into_iter().map(Into::into));
451 self
452 }
453
454 pub fn build(&self) -> Result<PathBuf, Error> {
457 let bin_name = self
458 .bin_name
459 .as_ref()
460 .expect("No binary name set - did you call deploy?");
461 let project_dir = self.project_dir.as_ref().expect("No project dir set");
462 let target_dir = self.target_dir.as_ref().expect("No target dir set");
463
464 let mut cmd = std::process::Command::new("cargo");
465 cmd.arg("build")
466 .arg("--example")
467 .arg(bin_name)
468 .arg("--no-default-features")
469 .current_dir(project_dir)
470 .env("CARGO_TARGET_DIR", target_dir)
471 .env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
472
473 let mut all_features = vec!["hydro___feature_maelstrom_runtime".to_owned()];
475 if let Some(features) = &self.features {
476 all_features.extend(features.iter().cloned());
477 }
478 if !all_features.is_empty() {
479 cmd.arg("--features").arg(all_features.join(","));
480 }
481
482 let status = cmd.status()?;
483 if !status.success() {
484 return Err(Error::other(format!(
485 "cargo build failed with status: {}",
486 status
487 )));
488 }
489
490 Ok(target_dir.join("debug").join("examples").join(bin_name))
491 }
492
493 pub fn run(self) -> Result<(), Error> {
497 let binary_path = self.build()?;
498
499 let mut cmd = std::process::Command::new(&self.maelstrom_path);
500 cmd.arg("test")
501 .arg("-w")
502 .arg(&self.workload)
503 .arg("--bin")
504 .arg(&binary_path)
505 .arg("--node-count")
506 .arg(self.node_count.to_string())
507 .stdout(Stdio::piped());
508
509 if let Some(time_limit) = self.time_limit {
510 cmd.arg("--time-limit").arg(time_limit.to_string());
511 }
512
513 if let Some(rate) = self.rate {
514 cmd.arg("--rate").arg(rate.to_string());
515 }
516
517 if let Some(availability) = self.availability {
518 cmd.arg("--availability").arg(availability);
519 }
520
521 if let Some(nemesis) = self.nemesis {
522 cmd.arg("--nemesis").arg(nemesis);
523 }
524
525 for arg in &self.extra_args {
526 cmd.arg(arg);
527 }
528
529 let spawned = cmd.spawn()?;
530
531 for line in BufReader::new(spawned.stdout.unwrap()).lines() {
532 let line = line?;
533 eprintln!("{}", &line);
534
535 if line.starts_with("Analysis invalid!") {
536 return Err(Error::other("Analysis was invalid"));
537 } else if line.starts_with("Errors occurred during analysis, but no anomalies found.")
538 || line.starts_with("Everything looks good!")
539 {
540 return Ok(());
541 }
542 }
543
544 Err(Error::other("Maelstrom produced an unexpected result"))
545 }
546
547 pub fn binary_path(&self) -> Option<PathBuf> {
549 let bin_name = self.bin_name.as_ref()?;
550 let target_dir = self.target_dir.as_ref()?;
551 Some(target_dir.join("debug").join("examples").join(bin_name))
552 }
553}