1use std::io::Error;
2use std::pin::Pin;
3
4use bytes::{Bytes, BytesMut};
5use dfir_lang::graph::DfirGraph;
6use futures::{Sink, Stream};
7use serde::Serialize;
8use serde::de::DeserializeOwned;
9use stageleft::QuotedWithContext;
10
11use crate::compile::builder::ExternalPortId;
12use crate::location::dynamic::LocationId;
13use crate::location::member_id::TaglessMemberId;
14use crate::location::{LocationKey, MembershipEvent, NetworkHint};
15
16pub trait Deploy<'a> {
17 type Meta: Default;
18 type InstantiateEnv;
19
20 type Process: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
21 type Cluster: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
22 type External: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv>
23 + RegisterPort<'a, Self>;
24
25 fn o2o_sink_source(
31 env: &mut Self::InstantiateEnv,
32 p1: &Self::Process,
33 p1_port: &<Self::Process as Node>::Port,
34 p2: &Self::Process,
35 p2_port: &<Self::Process as Node>::Port,
36 name: Option<&str>,
37 ) -> (syn::Expr, syn::Expr);
38
39 fn o2o_connect(
44 p1: &Self::Process,
45 p1_port: &<Self::Process as Node>::Port,
46 p2: &Self::Process,
47 p2_port: &<Self::Process as Node>::Port,
48 ) -> Box<dyn FnOnce()>;
49
50 fn o2m_sink_source(
58 env: &mut Self::InstantiateEnv,
59 p1: &Self::Process,
60 p1_port: &<Self::Process as Node>::Port,
61 c2: &Self::Cluster,
62 c2_port: &<Self::Cluster as Node>::Port,
63 name: Option<&str>,
64 ) -> (syn::Expr, syn::Expr);
65
66 fn o2m_connect(
71 p1: &Self::Process,
72 p1_port: &<Self::Process as Node>::Port,
73 c2: &Self::Cluster,
74 c2_port: &<Self::Cluster as Node>::Port,
75 ) -> Box<dyn FnOnce()>;
76
77 fn m2o_sink_source(
85 env: &mut Self::InstantiateEnv,
86 c1: &Self::Cluster,
87 c1_port: &<Self::Cluster as Node>::Port,
88 p2: &Self::Process,
89 p2_port: &<Self::Process as Node>::Port,
90 name: Option<&str>,
91 ) -> (syn::Expr, syn::Expr);
92
93 fn m2o_connect(
98 c1: &Self::Cluster,
99 c1_port: &<Self::Cluster as Node>::Port,
100 p2: &Self::Process,
101 p2_port: &<Self::Process as Node>::Port,
102 ) -> Box<dyn FnOnce()>;
103
104 fn m2m_sink_source(
112 env: &mut Self::InstantiateEnv,
113 c1: &Self::Cluster,
114 c1_port: &<Self::Cluster as Node>::Port,
115 c2: &Self::Cluster,
116 c2_port: &<Self::Cluster as Node>::Port,
117 name: Option<&str>,
118 ) -> (syn::Expr, syn::Expr);
119
120 fn m2m_connect(
125 c1: &Self::Cluster,
126 c1_port: &<Self::Cluster as Node>::Port,
127 c2: &Self::Cluster,
128 c2_port: &<Self::Cluster as Node>::Port,
129 ) -> Box<dyn FnOnce()>;
130
131 fn e2o_many_source(
132 extra_stmts: &mut Vec<syn::Stmt>,
133 p2: &Self::Process,
134 p2_port: &<Self::Process as Node>::Port,
135 codec_type: &syn::Type,
136 shared_handle: String,
137 ) -> syn::Expr;
138 fn e2o_many_sink(shared_handle: String) -> syn::Expr;
139
140 fn e2o_source(
141 extra_stmts: &mut Vec<syn::Stmt>,
142 p1: &Self::External,
143 p1_port: &<Self::External as Node>::Port,
144 p2: &Self::Process,
145 p2_port: &<Self::Process as Node>::Port,
146 codec_type: &syn::Type,
147 shared_handle: String,
148 ) -> syn::Expr;
149 fn e2o_connect(
150 p1: &Self::External,
151 p1_port: &<Self::External as Node>::Port,
152 p2: &Self::Process,
153 p2_port: &<Self::Process as Node>::Port,
154 many: bool,
155 server_hint: NetworkHint,
156 ) -> Box<dyn FnOnce()>;
157
158 fn o2e_sink(
159 p1: &Self::Process,
160 p1_port: &<Self::Process as Node>::Port,
161 p2: &Self::External,
162 p2_port: &<Self::External as Node>::Port,
163 shared_handle: String,
164 ) -> syn::Expr;
165
166 fn cluster_ids(
167 of_cluster: LocationKey,
168 ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a;
169
170 fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a;
171
172 fn cluster_membership_stream(
173 env: &mut Self::InstantiateEnv,
174 at_location: &LocationId,
175 location_id: &LocationId,
176 ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>;
177
178 fn register_embedded_input(
183 _env: &mut Self::InstantiateEnv,
184 _location_key: LocationKey,
185 _ident: &syn::Ident,
186 _element_type: &syn::Type,
187 ) {
188 panic!("register_embedded_input is only supported by EmbeddedDeploy");
189 }
190
191 fn register_embedded_output(
196 _env: &mut Self::InstantiateEnv,
197 _location_key: LocationKey,
198 _ident: &syn::Ident,
199 _element_type: &syn::Type,
200 ) {
201 panic!("register_embedded_output is only supported by EmbeddedDeploy");
202 }
203}
204
205pub trait ProcessSpec<'a, D>
206where
207 D: Deploy<'a> + ?Sized,
208{
209 fn build(self, location_key: LocationKey, name_hint: &str) -> D::Process;
210}
211
212pub trait IntoProcessSpec<'a, D>
213where
214 D: Deploy<'a> + ?Sized,
215{
216 type ProcessSpec: ProcessSpec<'a, D>;
217 fn into_process_spec(self) -> Self::ProcessSpec;
218}
219
220impl<'a, D, T> IntoProcessSpec<'a, D> for T
221where
222 D: Deploy<'a> + ?Sized,
223 T: ProcessSpec<'a, D>,
224{
225 type ProcessSpec = T;
226 fn into_process_spec(self) -> Self::ProcessSpec {
227 self
228 }
229}
230
231pub trait ClusterSpec<'a, D>
232where
233 D: Deploy<'a> + ?Sized,
234{
235 fn build(self, location_key: LocationKey, name_hint: &str) -> D::Cluster;
236}
237
238pub trait ExternalSpec<'a, D>
239where
240 D: Deploy<'a> + ?Sized,
241{
242 fn build(self, location_key: LocationKey, name_hint: &str) -> D::External;
243}
244
245pub trait Node {
246 type Port: Clone;
253 type Meta: Default;
254 type InstantiateEnv;
255
256 fn next_port(&self) -> Self::Port;
258
259 fn update_meta(&self, meta: &Self::Meta);
260
261 fn instantiate(
262 &self,
263 env: &mut Self::InstantiateEnv,
264 meta: &mut Self::Meta,
265 graph: DfirGraph,
266 extra_stmts: &[syn::Stmt],
267 sidecars: &[syn::Expr],
268 );
269}
270
271pub type DynSourceSink<Out, In, InErr> = (
272 Pin<Box<dyn Stream<Item = Out>>>,
273 Pin<Box<dyn Sink<In, Error = InErr>>>,
274);
275
276pub trait RegisterPort<'a, D>: Node + Clone
277where
278 D: Deploy<'a> + ?Sized,
279{
280 fn register(&self, external_port_id: ExternalPortId, port: Self::Port);
281
282 fn as_bytes_bidi(
283 &self,
284 external_port_id: ExternalPortId,
285 ) -> impl Future<Output = DynSourceSink<Result<BytesMut, Error>, Bytes, Error>> + 'a;
286
287 fn as_bincode_bidi<InT, OutT>(
288 &self,
289 external_port_id: ExternalPortId,
290 ) -> impl Future<Output = DynSourceSink<OutT, InT, Error>> + 'a
291 where
292 InT: Serialize + 'static,
293 OutT: DeserializeOwned + 'static;
294
295 fn as_bincode_sink<T>(
296 &self,
297 external_port_id: ExternalPortId,
298 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a
299 where
300 T: Serialize + 'static;
301
302 fn as_bincode_source<T>(
303 &self,
304 external_port_id: ExternalPortId,
305 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
306 where
307 T: DeserializeOwned + 'static;
308}