hydro_lang/
ir.rs

1use core::panic;
2use std::cell::RefCell;
3#[cfg(feature = "build")]
4use std::collections::BTreeMap;
5use std::collections::HashMap;
6use std::fmt::Debug;
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use syn::parse_quote;
21
22#[cfg(feature = "build")]
23use crate::deploy::{Deploy, RegisterPort};
24use crate::location::LocationId;
25
26#[derive(Clone, Hash)]
27pub struct DebugExpr(pub syn::Expr);
28
29impl From<syn::Expr> for DebugExpr {
30    fn from(expr: syn::Expr) -> DebugExpr {
31        DebugExpr(expr)
32    }
33}
34
35impl Deref for DebugExpr {
36    type Target = syn::Expr;
37
38    fn deref(&self) -> &Self::Target {
39        &self.0
40    }
41}
42
43impl ToTokens for DebugExpr {
44    fn to_tokens(&self, tokens: &mut TokenStream) {
45        self.0.to_tokens(tokens);
46    }
47}
48
49impl Debug for DebugExpr {
50    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51        write!(f, "{}", self.0.to_token_stream())
52    }
53}
54
55#[derive(Clone, Hash)]
56pub struct DebugType(pub syn::Type);
57
58impl From<syn::Type> for DebugType {
59    fn from(t: syn::Type) -> DebugType {
60        DebugType(t)
61    }
62}
63
64impl Deref for DebugType {
65    type Target = syn::Type;
66
67    fn deref(&self) -> &Self::Target {
68        &self.0
69    }
70}
71
72impl ToTokens for DebugType {
73    fn to_tokens(&self, tokens: &mut TokenStream) {
74        self.0.to_tokens(tokens);
75    }
76}
77
78impl Debug for DebugType {
79    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80        write!(f, "{}", self.0.to_token_stream())
81    }
82}
83
84#[allow(clippy::allow_attributes, reason = "Only triggered on nightly.")]
85#[allow(
86    clippy::large_enum_variant,
87    reason = "`Building` is just equivalent to `None`."
88)]
89pub enum DebugInstantiate {
90    Building,
91    Finalized(syn::Expr, syn::Expr, Option<Box<dyn FnOnce()>>),
92}
93
94impl Debug for DebugInstantiate {
95    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96        write!(f, "<network instantiate>")
97    }
98}
99
100impl Hash for DebugInstantiate {
101    fn hash<H: Hasher>(&self, _state: &mut H) {
102        // Do nothing
103    }
104}
105
106impl Clone for DebugInstantiate {
107    fn clone(&self) -> Self {
108        match self {
109            DebugInstantiate::Building => DebugInstantiate::Building,
110            DebugInstantiate::Finalized(_, _, _) => {
111                panic!("DebugInstantiate::Finalized should not be cloned")
112            }
113        }
114    }
115}
116
117/// A source in a Hydro graph, where data enters the graph.
118#[derive(Debug, Hash, Clone)]
119pub enum HydroSource {
120    Stream(DebugExpr),
121    ExternalNetwork(),
122    Iter(DebugExpr),
123    Spin(),
124}
125
126#[cfg(feature = "build")]
127pub enum BuildersOrCallback<
128    'a,
129    L: FnMut(&mut HydroLeaf, &mut usize),
130    N: FnMut(&mut HydroNode, &mut usize),
131> {
132    Builders(&'a mut BTreeMap<usize, FlatGraphBuilder>),
133    Callback(L, N),
134}
135
136/// An leaf in a Hydro graph, which is an pipeline that doesn't emit
137/// any downstream values. Traversals over the dataflow graph and
138/// generating DFIR IR start from leaves.
139#[derive(Debug, Hash)]
140pub enum HydroLeaf {
141    ForEach {
142        f: DebugExpr,
143        input: Box<HydroNode>,
144        metadata: HydroIrMetadata,
145    },
146    DestSink {
147        sink: DebugExpr,
148        input: Box<HydroNode>,
149        metadata: HydroIrMetadata,
150    },
151    CycleSink {
152        ident: syn::Ident,
153        location_kind: LocationId,
154        input: Box<HydroNode>,
155        metadata: HydroIrMetadata,
156    },
157}
158
159impl HydroLeaf {
160    #[cfg(feature = "build")]
161    pub fn compile_network<'a, D: Deploy<'a>>(
162        &mut self,
163        compile_env: &D::CompileEnv,
164        seen_tees: &mut SeenTees,
165        seen_tee_locations: &mut SeenTeeLocations,
166        processes: &HashMap<usize, D::Process>,
167        clusters: &HashMap<usize, D::Cluster>,
168        externals: &HashMap<usize, D::ExternalProcess>,
169    ) {
170        self.transform_children(
171            |n, s| {
172                n.compile_network::<D>(
173                    compile_env,
174                    s,
175                    seen_tee_locations,
176                    processes,
177                    clusters,
178                    externals,
179                );
180            },
181            seen_tees,
182        )
183    }
184
185    pub fn connect_network(&mut self, seen_tees: &mut SeenTees) {
186        self.transform_children(
187            |n, s| {
188                n.connect_network(s);
189            },
190            seen_tees,
191        )
192    }
193
194    pub fn transform_bottom_up(
195        &mut self,
196        transform_leaf: &mut impl FnMut(&mut HydroLeaf),
197        transform_node: &mut impl FnMut(&mut HydroNode),
198        seen_tees: &mut SeenTees,
199    ) {
200        self.transform_children(|n, s| n.transform_bottom_up(transform_node, s), seen_tees);
201
202        transform_leaf(self);
203    }
204
205    pub fn transform_children(
206        &mut self,
207        mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
208        seen_tees: &mut SeenTees,
209    ) {
210        match self {
211            HydroLeaf::ForEach { f: _, input, .. }
212            | HydroLeaf::DestSink { sink: _, input, .. }
213            | HydroLeaf::CycleSink {
214                ident: _,
215                location_kind: _,
216                input,
217                ..
218            } => {
219                transform(input, seen_tees);
220            }
221        }
222    }
223
224    pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroLeaf {
225        match self {
226            HydroLeaf::ForEach { f, input, metadata } => HydroLeaf::ForEach {
227                f: f.clone(),
228                input: Box::new(input.deep_clone(seen_tees)),
229                metadata: metadata.clone(),
230            },
231            HydroLeaf::DestSink {
232                sink,
233                input,
234                metadata,
235            } => HydroLeaf::DestSink {
236                sink: sink.clone(),
237                input: Box::new(input.deep_clone(seen_tees)),
238                metadata: metadata.clone(),
239            },
240            HydroLeaf::CycleSink {
241                ident,
242                location_kind,
243                input,
244                metadata,
245            } => HydroLeaf::CycleSink {
246                ident: ident.clone(),
247                location_kind: location_kind.clone(),
248                input: Box::new(input.deep_clone(seen_tees)),
249                metadata: metadata.clone(),
250            },
251        }
252    }
253
254    #[cfg(feature = "build")]
255    pub fn emit(
256        &mut self,
257        graph_builders: &mut BTreeMap<usize, FlatGraphBuilder>,
258        built_tees: &mut HashMap<*const RefCell<HydroNode>, (syn::Ident, usize)>,
259        next_stmt_id: &mut usize,
260    ) {
261        self.emit_core(
262            &mut BuildersOrCallback::Builders::<
263                fn(&mut HydroLeaf, &mut usize),
264                fn(&mut HydroNode, &mut usize),
265            >(graph_builders),
266            built_tees,
267            next_stmt_id,
268        );
269    }
270
271    #[cfg(feature = "build")]
272    pub fn emit_core(
273        &mut self,
274        builders_or_callback: &mut BuildersOrCallback<
275            impl FnMut(&mut HydroLeaf, &mut usize),
276            impl FnMut(&mut HydroNode, &mut usize),
277        >,
278        built_tees: &mut HashMap<*const RefCell<HydroNode>, (syn::Ident, usize)>,
279        next_stmt_id: &mut usize,
280    ) {
281        match self {
282            HydroLeaf::ForEach { f, input, .. } => {
283                let (input_ident, input_location_id) =
284                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
285
286                match builders_or_callback {
287                    BuildersOrCallback::Builders(graph_builders) => {
288                        graph_builders
289                            .entry(input_location_id)
290                            .or_default()
291                            .add_dfir(
292                                parse_quote! {
293                                    #input_ident -> for_each(#f);
294                                },
295                                None,
296                                Some(&next_stmt_id.to_string()),
297                            );
298                    }
299                    BuildersOrCallback::Callback(leaf_callback, _) => {
300                        leaf_callback(self, next_stmt_id);
301                    }
302                }
303
304                *next_stmt_id += 1;
305            }
306
307            HydroLeaf::DestSink { sink, input, .. } => {
308                let (input_ident, input_location_id) =
309                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
310
311                match builders_or_callback {
312                    BuildersOrCallback::Builders(graph_builders) => {
313                        graph_builders
314                            .entry(input_location_id)
315                            .or_default()
316                            .add_dfir(
317                                parse_quote! {
318                                    #input_ident -> dest_sink(#sink);
319                                },
320                                None,
321                                Some(&next_stmt_id.to_string()),
322                            );
323                    }
324                    BuildersOrCallback::Callback(leaf_callback, _) => {
325                        leaf_callback(self, next_stmt_id);
326                    }
327                }
328
329                *next_stmt_id += 1;
330            }
331
332            HydroLeaf::CycleSink {
333                ident,
334                location_kind,
335                input,
336                ..
337            } => {
338                let (input_ident, input_location_id) =
339                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
340
341                let location_id = match location_kind.root() {
342                    LocationId::Process(id) => id,
343                    LocationId::Cluster(id) => id,
344                    LocationId::Tick(_, _) => panic!(),
345                    LocationId::ExternalProcess(_) => panic!(),
346                };
347
348                assert_eq!(
349                    input_location_id, *location_id,
350                    "cycle_sink location mismatch"
351                );
352
353                match builders_or_callback {
354                    BuildersOrCallback::Builders(graph_builders) => {
355                        graph_builders.entry(*location_id).or_default().add_dfir(
356                            parse_quote! {
357                                #ident = #input_ident;
358                            },
359                            None,
360                            None,
361                        );
362                    }
363                    // No ID, no callback
364                    BuildersOrCallback::Callback(_, _) => {}
365                }
366            }
367        }
368    }
369
370    pub fn metadata(&self) -> &HydroIrMetadata {
371        match self {
372            HydroLeaf::ForEach { metadata, .. }
373            | HydroLeaf::DestSink { metadata, .. }
374            | HydroLeaf::CycleSink { metadata, .. } => metadata,
375        }
376    }
377
378    pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
379        match self {
380            HydroLeaf::ForEach { metadata, .. }
381            | HydroLeaf::DestSink { metadata, .. }
382            | HydroLeaf::CycleSink { metadata, .. } => metadata,
383        }
384    }
385
386    pub fn print_root(&self) -> String {
387        match self {
388            HydroLeaf::ForEach { f, .. } => format!("ForEach({:?})", f),
389            HydroLeaf::DestSink { sink, .. } => format!("DestSink({:?})", sink),
390            HydroLeaf::CycleSink { ident, .. } => format!("CycleSink({:?})", ident),
391        }
392    }
393}
394
395#[cfg(feature = "build")]
396pub fn emit(ir: &mut Vec<HydroLeaf>) -> BTreeMap<usize, FlatGraphBuilder> {
397    let mut builders = BTreeMap::new();
398    let mut built_tees = HashMap::new();
399    let mut next_stmt_id = 0;
400    for leaf in ir {
401        leaf.emit(&mut builders, &mut built_tees, &mut next_stmt_id);
402    }
403    builders
404}
405
406#[cfg(feature = "build")]
407pub fn traverse_dfir(
408    ir: &mut [HydroLeaf],
409    transform_leaf: impl FnMut(&mut HydroLeaf, &mut usize),
410    transform_node: impl FnMut(&mut HydroNode, &mut usize),
411) {
412    let mut seen_tees = HashMap::new();
413    let mut next_stmt_id = 0;
414    let mut callback = BuildersOrCallback::Callback(transform_leaf, transform_node);
415    ir.iter_mut().for_each(|leaf| {
416        leaf.emit_core(&mut callback, &mut seen_tees, &mut next_stmt_id);
417    });
418}
419
420pub fn transform_bottom_up(
421    ir: &mut [HydroLeaf],
422    transform_leaf: &mut impl FnMut(&mut HydroLeaf),
423    transform_node: &mut impl FnMut(&mut HydroNode),
424) {
425    let mut seen_tees = HashMap::new();
426    ir.iter_mut().for_each(|leaf| {
427        leaf.transform_bottom_up(transform_leaf, transform_node, &mut seen_tees);
428    });
429}
430
431pub fn deep_clone(ir: &[HydroLeaf]) -> Vec<HydroLeaf> {
432    let mut seen_tees = HashMap::new();
433    ir.iter()
434        .map(|leaf| leaf.deep_clone(&mut seen_tees))
435        .collect()
436}
437
438type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
439thread_local! {
440    static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
441}
442
443pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
444    PRINTED_TEES.with(|printed_tees| {
445        let mut printed_tees_mut = printed_tees.borrow_mut();
446        *printed_tees_mut = Some((0, HashMap::new()));
447        drop(printed_tees_mut);
448
449        let ret = f();
450
451        let mut printed_tees_mut = printed_tees.borrow_mut();
452        *printed_tees_mut = None;
453
454        ret
455    })
456}
457
458pub struct TeeNode(pub Rc<RefCell<HydroNode>>);
459
460impl Debug for TeeNode {
461    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
462        PRINTED_TEES.with(|printed_tees| {
463            let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
464            let printed_tees_mut = printed_tees_mut_borrow.as_mut();
465
466            if let Some(printed_tees_mut) = printed_tees_mut {
467                if let Some(existing) = printed_tees_mut
468                    .1
469                    .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
470                {
471                    write!(f, "<tee {}>", existing)
472                } else {
473                    let next_id = printed_tees_mut.0;
474                    printed_tees_mut.0 += 1;
475                    printed_tees_mut
476                        .1
477                        .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
478                    drop(printed_tees_mut_borrow);
479                    write!(f, "<tee {}>: ", next_id)?;
480                    Debug::fmt(&self.0.borrow(), f)
481                }
482            } else {
483                drop(printed_tees_mut_borrow);
484                write!(f, "<tee>: ")?;
485                Debug::fmt(&self.0.borrow(), f)
486            }
487        })
488    }
489}
490
491impl Hash for TeeNode {
492    fn hash<H: Hasher>(&self, state: &mut H) {
493        self.0.borrow_mut().hash(state);
494    }
495}
496
497#[derive(Debug, Clone)]
498pub struct HydroIrMetadata {
499    pub location_kind: LocationId,
500    pub output_type: Option<DebugType>,
501    pub cardinality: Option<usize>,
502    pub cpu_usage: Option<f64>,
503}
504
505// HydroIrMetadata shouldn't be used to hash or compare
506impl Hash for HydroIrMetadata {
507    fn hash<H: Hasher>(&self, _: &mut H) {}
508}
509
510impl PartialEq for HydroIrMetadata {
511    fn eq(&self, _: &Self) -> bool {
512        true
513    }
514}
515
516impl Eq for HydroIrMetadata {}
517
518/// An intermediate node in a Hydro graph, which consumes data
519/// from upstream nodes and emits data to downstream nodes.
520#[allow(clippy::allow_attributes, reason = "Only triggered on nightly.")]
521#[allow(clippy::large_enum_variant, reason = "TODO(mingwei):")]
522#[derive(Debug, Hash)]
523pub enum HydroNode {
524    Placeholder,
525
526    Source {
527        source: HydroSource,
528        location_kind: LocationId,
529        metadata: HydroIrMetadata,
530    },
531
532    CycleSource {
533        ident: syn::Ident,
534        location_kind: LocationId,
535        metadata: HydroIrMetadata,
536    },
537
538    Tee {
539        inner: TeeNode,
540        metadata: HydroIrMetadata,
541    },
542
543    Persist {
544        inner: Box<HydroNode>,
545        metadata: HydroIrMetadata,
546    },
547
548    Unpersist {
549        inner: Box<HydroNode>,
550        metadata: HydroIrMetadata,
551    },
552
553    Delta {
554        inner: Box<HydroNode>,
555        metadata: HydroIrMetadata,
556    },
557
558    Chain {
559        first: Box<HydroNode>,
560        second: Box<HydroNode>,
561        metadata: HydroIrMetadata,
562    },
563
564    CrossProduct {
565        left: Box<HydroNode>,
566        right: Box<HydroNode>,
567        metadata: HydroIrMetadata,
568    },
569
570    CrossSingleton {
571        left: Box<HydroNode>,
572        right: Box<HydroNode>,
573        metadata: HydroIrMetadata,
574    },
575
576    Join {
577        left: Box<HydroNode>,
578        right: Box<HydroNode>,
579        metadata: HydroIrMetadata,
580    },
581
582    Difference {
583        pos: Box<HydroNode>,
584        neg: Box<HydroNode>,
585        metadata: HydroIrMetadata,
586    },
587
588    AntiJoin {
589        pos: Box<HydroNode>,
590        neg: Box<HydroNode>,
591        metadata: HydroIrMetadata,
592    },
593
594    ResolveFutures {
595        input: Box<HydroNode>,
596        metadata: HydroIrMetadata,
597    },
598    ResolveFuturesOrdered {
599        input: Box<HydroNode>,
600        metadata: HydroIrMetadata,
601    },
602
603    Map {
604        f: DebugExpr,
605        input: Box<HydroNode>,
606        metadata: HydroIrMetadata,
607    },
608    FlatMap {
609        f: DebugExpr,
610        input: Box<HydroNode>,
611        metadata: HydroIrMetadata,
612    },
613    Filter {
614        f: DebugExpr,
615        input: Box<HydroNode>,
616        metadata: HydroIrMetadata,
617    },
618    FilterMap {
619        f: DebugExpr,
620        input: Box<HydroNode>,
621        metadata: HydroIrMetadata,
622    },
623
624    DeferTick {
625        input: Box<HydroNode>,
626        metadata: HydroIrMetadata,
627    },
628    Enumerate {
629        is_static: bool,
630        input: Box<HydroNode>,
631        metadata: HydroIrMetadata,
632    },
633    Inspect {
634        f: DebugExpr,
635        input: Box<HydroNode>,
636        metadata: HydroIrMetadata,
637    },
638
639    Unique {
640        input: Box<HydroNode>,
641        metadata: HydroIrMetadata,
642    },
643
644    Sort {
645        input: Box<HydroNode>,
646        metadata: HydroIrMetadata,
647    },
648    Fold {
649        init: DebugExpr,
650        acc: DebugExpr,
651        input: Box<HydroNode>,
652        metadata: HydroIrMetadata,
653    },
654    FoldKeyed {
655        init: DebugExpr,
656        acc: DebugExpr,
657        input: Box<HydroNode>,
658        metadata: HydroIrMetadata,
659    },
660
661    Reduce {
662        f: DebugExpr,
663        input: Box<HydroNode>,
664        metadata: HydroIrMetadata,
665    },
666    ReduceKeyed {
667        f: DebugExpr,
668        input: Box<HydroNode>,
669        metadata: HydroIrMetadata,
670    },
671
672    Network {
673        from_key: Option<usize>,
674        to_location: LocationId,
675        to_key: Option<usize>,
676        serialize_fn: Option<DebugExpr>,
677        instantiate_fn: DebugInstantiate,
678        deserialize_fn: Option<DebugExpr>,
679        input: Box<HydroNode>,
680        metadata: HydroIrMetadata,
681    },
682
683    Counter {
684        tag: String,
685        duration: DebugExpr,
686        input: Box<HydroNode>,
687        metadata: HydroIrMetadata,
688    },
689}
690
691pub type SeenTees = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
692pub type SeenTeeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
693
694impl<'a> HydroNode {
695    #[cfg(feature = "build")]
696    pub fn compile_network<D: Deploy<'a>>(
697        &mut self,
698        compile_env: &D::CompileEnv,
699        seen_tees: &mut SeenTees,
700        seen_tee_locations: &mut SeenTeeLocations,
701        nodes: &HashMap<usize, D::Process>,
702        clusters: &HashMap<usize, D::Cluster>,
703        externals: &HashMap<usize, D::ExternalProcess>,
704    ) {
705        let mut curr_location = None;
706
707        self.transform_bottom_up(
708            &mut |n| {
709                if let HydroNode::Network {
710                    from_key,
711                    to_location,
712                    to_key,
713                    instantiate_fn,
714                    ..
715                } = n
716                {
717                    let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
718                        DebugInstantiate::Building => instantiate_network::<D>(
719                            curr_location.as_ref().unwrap(),
720                            *from_key,
721                            to_location,
722                            *to_key,
723                            nodes,
724                            clusters,
725                            externals,
726                            compile_env,
727                        ),
728
729                        DebugInstantiate::Finalized(_, _, _) => panic!("network already finalized"),
730                    };
731
732                    *instantiate_fn =
733                        DebugInstantiate::Finalized(sink_expr, source_expr, Some(connect_fn));
734                }
735
736                // Calculate location of current node to use as from_location
737                match n {
738                    HydroNode::Network {
739                        to_location: location_kind,
740                        ..
741                    }
742                    | HydroNode::CycleSource { location_kind, .. }
743                    | HydroNode::Source { location_kind, .. } => {
744                        // Unwrap location out of Tick
745                        if let LocationId::Tick(_, tick_loc) = location_kind {
746                            curr_location = Some(*tick_loc.clone());
747                        } else {
748                            curr_location = Some(location_kind.clone());
749                        }
750                    }
751                    HydroNode::Tee { inner, .. } => {
752                        let inner_ref = inner.0.as_ref() as *const RefCell<HydroNode>;
753                        if let Some(tee_location) = seen_tee_locations.get(&inner_ref) {
754                            curr_location = Some(tee_location.clone());
755                        } else {
756                            seen_tee_locations
757                                .insert(inner_ref, curr_location.as_ref().unwrap().clone());
758                        }
759                    }
760                    _ => {}
761                }
762            },
763            seen_tees,
764        );
765    }
766
767    pub fn connect_network(&mut self, seen_tees: &mut SeenTees) {
768        self.transform_bottom_up(
769            &mut |n| {
770                if let HydroNode::Network { instantiate_fn, .. } = n {
771                    match instantiate_fn {
772                        DebugInstantiate::Building => panic!("network not built"),
773
774                        DebugInstantiate::Finalized(_, _, connect_fn) => {
775                            (connect_fn.take().unwrap())();
776                        }
777                    }
778                }
779            },
780            seen_tees,
781        );
782    }
783
784    pub fn transform_bottom_up(
785        &mut self,
786        transform: &mut impl FnMut(&mut HydroNode),
787        seen_tees: &mut SeenTees,
788    ) {
789        self.transform_children(|n, s| n.transform_bottom_up(transform, s), seen_tees);
790
791        transform(self);
792    }
793
794    #[inline(always)]
795    pub fn transform_children(
796        &mut self,
797        mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
798        seen_tees: &mut SeenTees,
799    ) {
800        match self {
801            HydroNode::Placeholder => {
802                panic!();
803            }
804
805            HydroNode::Source { .. } | HydroNode::CycleSource { .. } => {}
806
807            HydroNode::Tee { inner, .. } => {
808                if let Some(transformed) =
809                    seen_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
810                {
811                    *inner = TeeNode(transformed.clone());
812                } else {
813                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
814                    seen_tees.insert(
815                        inner.0.as_ref() as *const RefCell<HydroNode>,
816                        transformed_cell.clone(),
817                    );
818                    let mut orig = inner.0.replace(HydroNode::Placeholder);
819                    transform(&mut orig, seen_tees);
820                    *transformed_cell.borrow_mut() = orig;
821                    *inner = TeeNode(transformed_cell);
822                }
823            }
824
825            HydroNode::Persist { inner, .. }
826            | HydroNode::Unpersist { inner, .. }
827            | HydroNode::Delta { inner, .. } => {
828                transform(inner.as_mut(), seen_tees);
829            }
830
831            HydroNode::Chain { first, second, .. } => {
832                transform(first.as_mut(), seen_tees);
833                transform(second.as_mut(), seen_tees);
834            }
835
836            HydroNode::CrossSingleton { left, right, .. }
837            | HydroNode::CrossProduct { left, right, .. }
838            | HydroNode::Join { left, right, .. } => {
839                transform(left.as_mut(), seen_tees);
840                transform(right.as_mut(), seen_tees);
841            }
842
843            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
844                transform(pos.as_mut(), seen_tees);
845                transform(neg.as_mut(), seen_tees);
846            }
847
848            HydroNode::Map { input, .. }
849            | HydroNode::ResolveFutures { input, .. }
850            | HydroNode::ResolveFuturesOrdered { input, .. }
851            | HydroNode::FlatMap { input, .. }
852            | HydroNode::Filter { input, .. }
853            | HydroNode::FilterMap { input, .. }
854            | HydroNode::Sort { input, .. }
855            | HydroNode::DeferTick { input, .. }
856            | HydroNode::Enumerate { input, .. }
857            | HydroNode::Inspect { input, .. }
858            | HydroNode::Unique { input, .. }
859            | HydroNode::Network { input, .. }
860            | HydroNode::Fold { input, .. }
861            | HydroNode::FoldKeyed { input, .. }
862            | HydroNode::Reduce { input, .. }
863            | HydroNode::ReduceKeyed { input, .. }
864            | HydroNode::Counter { input, .. } => {
865                transform(input.as_mut(), seen_tees);
866            }
867        }
868    }
869
870    pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroNode {
871        match self {
872            HydroNode::Placeholder => HydroNode::Placeholder,
873            HydroNode::Source {
874                source,
875                location_kind,
876                metadata,
877            } => HydroNode::Source {
878                source: source.clone(),
879                location_kind: location_kind.clone(),
880                metadata: metadata.clone(),
881            },
882            HydroNode::CycleSource {
883                ident,
884                location_kind,
885                metadata,
886            } => HydroNode::CycleSource {
887                ident: ident.clone(),
888                location_kind: location_kind.clone(),
889                metadata: metadata.clone(),
890            },
891            HydroNode::Tee { inner, metadata } => {
892                if let Some(transformed) =
893                    seen_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
894                {
895                    HydroNode::Tee {
896                        inner: TeeNode(transformed.clone()),
897                        metadata: metadata.clone(),
898                    }
899                } else {
900                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
901                    seen_tees.insert(
902                        inner.0.as_ref() as *const RefCell<HydroNode>,
903                        new_rc.clone(),
904                    );
905                    let cloned = inner.0.borrow().deep_clone(seen_tees);
906                    *new_rc.borrow_mut() = cloned;
907                    HydroNode::Tee {
908                        inner: TeeNode(new_rc),
909                        metadata: metadata.clone(),
910                    }
911                }
912            }
913            HydroNode::Persist { inner, metadata } => HydroNode::Persist {
914                inner: Box::new(inner.deep_clone(seen_tees)),
915                metadata: metadata.clone(),
916            },
917            HydroNode::Unpersist { inner, metadata } => HydroNode::Unpersist {
918                inner: Box::new(inner.deep_clone(seen_tees)),
919                metadata: metadata.clone(),
920            },
921            HydroNode::Delta { inner, metadata } => HydroNode::Delta {
922                inner: Box::new(inner.deep_clone(seen_tees)),
923                metadata: metadata.clone(),
924            },
925            HydroNode::Chain {
926                first,
927                second,
928                metadata,
929            } => HydroNode::Chain {
930                first: Box::new(first.deep_clone(seen_tees)),
931                second: Box::new(second.deep_clone(seen_tees)),
932                metadata: metadata.clone(),
933            },
934            HydroNode::CrossProduct {
935                left,
936                right,
937                metadata,
938            } => HydroNode::CrossProduct {
939                left: Box::new(left.deep_clone(seen_tees)),
940                right: Box::new(right.deep_clone(seen_tees)),
941                metadata: metadata.clone(),
942            },
943            HydroNode::CrossSingleton {
944                left,
945                right,
946                metadata,
947            } => HydroNode::CrossSingleton {
948                left: Box::new(left.deep_clone(seen_tees)),
949                right: Box::new(right.deep_clone(seen_tees)),
950                metadata: metadata.clone(),
951            },
952            HydroNode::Join {
953                left,
954                right,
955                metadata,
956            } => HydroNode::Join {
957                left: Box::new(left.deep_clone(seen_tees)),
958                right: Box::new(right.deep_clone(seen_tees)),
959                metadata: metadata.clone(),
960            },
961            HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
962                pos: Box::new(pos.deep_clone(seen_tees)),
963                neg: Box::new(neg.deep_clone(seen_tees)),
964                metadata: metadata.clone(),
965            },
966            HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
967                pos: Box::new(pos.deep_clone(seen_tees)),
968                neg: Box::new(neg.deep_clone(seen_tees)),
969                metadata: metadata.clone(),
970            },
971            HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
972                input: Box::new(input.deep_clone(seen_tees)),
973                metadata: metadata.clone(),
974            },
975            HydroNode::ResolveFuturesOrdered { input, metadata } => {
976                HydroNode::ResolveFuturesOrdered {
977                    input: Box::new(input.deep_clone(seen_tees)),
978                    metadata: metadata.clone(),
979                }
980            }
981            HydroNode::Map { f, input, metadata } => HydroNode::Map {
982                f: f.clone(),
983                input: Box::new(input.deep_clone(seen_tees)),
984                metadata: metadata.clone(),
985            },
986            HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
987                f: f.clone(),
988                input: Box::new(input.deep_clone(seen_tees)),
989                metadata: metadata.clone(),
990            },
991            HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
992                f: f.clone(),
993                input: Box::new(input.deep_clone(seen_tees)),
994                metadata: metadata.clone(),
995            },
996            HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
997                f: f.clone(),
998                input: Box::new(input.deep_clone(seen_tees)),
999                metadata: metadata.clone(),
1000            },
1001            HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
1002                input: Box::new(input.deep_clone(seen_tees)),
1003                metadata: metadata.clone(),
1004            },
1005            HydroNode::Enumerate {
1006                is_static,
1007                input,
1008                metadata,
1009            } => HydroNode::Enumerate {
1010                is_static: *is_static,
1011                input: Box::new(input.deep_clone(seen_tees)),
1012                metadata: metadata.clone(),
1013            },
1014            HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
1015                f: f.clone(),
1016                input: Box::new(input.deep_clone(seen_tees)),
1017                metadata: metadata.clone(),
1018            },
1019            HydroNode::Unique { input, metadata } => HydroNode::Unique {
1020                input: Box::new(input.deep_clone(seen_tees)),
1021                metadata: metadata.clone(),
1022            },
1023            HydroNode::Sort { input, metadata } => HydroNode::Sort {
1024                input: Box::new(input.deep_clone(seen_tees)),
1025                metadata: metadata.clone(),
1026            },
1027            HydroNode::Fold {
1028                init,
1029                acc,
1030                input,
1031                metadata,
1032            } => HydroNode::Fold {
1033                init: init.clone(),
1034                acc: acc.clone(),
1035                input: Box::new(input.deep_clone(seen_tees)),
1036                metadata: metadata.clone(),
1037            },
1038            HydroNode::FoldKeyed {
1039                init,
1040                acc,
1041                input,
1042                metadata,
1043            } => HydroNode::FoldKeyed {
1044                init: init.clone(),
1045                acc: acc.clone(),
1046                input: Box::new(input.deep_clone(seen_tees)),
1047                metadata: metadata.clone(),
1048            },
1049            HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
1050                f: f.clone(),
1051                input: Box::new(input.deep_clone(seen_tees)),
1052                metadata: metadata.clone(),
1053            },
1054            HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
1055                f: f.clone(),
1056                input: Box::new(input.deep_clone(seen_tees)),
1057                metadata: metadata.clone(),
1058            },
1059            HydroNode::Network {
1060                from_key,
1061                to_location,
1062                to_key,
1063                serialize_fn,
1064                instantiate_fn,
1065                deserialize_fn,
1066                input,
1067                metadata,
1068            } => HydroNode::Network {
1069                from_key: *from_key,
1070                to_location: to_location.clone(),
1071                to_key: *to_key,
1072                serialize_fn: serialize_fn.clone(),
1073                instantiate_fn: instantiate_fn.clone(),
1074                deserialize_fn: deserialize_fn.clone(),
1075                input: Box::new(input.deep_clone(seen_tees)),
1076                metadata: metadata.clone(),
1077            },
1078            HydroNode::Counter {
1079                tag,
1080                duration,
1081                input,
1082                metadata,
1083            } => HydroNode::Counter {
1084                tag: tag.clone(),
1085                duration: duration.clone(),
1086                input: Box::new(input.deep_clone(seen_tees)),
1087                metadata: metadata.clone(),
1088            },
1089        }
1090    }
1091
1092    #[cfg(feature = "build")]
1093    pub fn emit_core(
1094        &mut self,
1095        builders_or_callback: &mut BuildersOrCallback<
1096            impl FnMut(&mut HydroLeaf, &mut usize),
1097            impl FnMut(&mut HydroNode, &mut usize),
1098        >,
1099        built_tees: &mut HashMap<*const RefCell<HydroNode>, (syn::Ident, usize)>,
1100        next_stmt_id: &mut usize,
1101    ) -> (syn::Ident, usize) {
1102        match self {
1103            HydroNode::Placeholder => {
1104                panic!()
1105            }
1106
1107            HydroNode::Persist { inner, .. } => {
1108                let (inner_ident, location) =
1109                    inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
1110
1111                let persist_ident =
1112                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1113
1114                match builders_or_callback {
1115                    BuildersOrCallback::Builders(graph_builders) => {
1116                        let builder = graph_builders.entry(location).or_default();
1117                        builder.add_dfir(
1118                            parse_quote! {
1119                                #persist_ident = #inner_ident -> persist::<'static>();
1120                            },
1121                            None,
1122                            Some(&next_stmt_id.to_string()),
1123                        );
1124                    }
1125                    BuildersOrCallback::Callback(_, node_callback) => {
1126                        node_callback(self, next_stmt_id);
1127                    }
1128                }
1129
1130                *next_stmt_id += 1;
1131
1132                (persist_ident, location)
1133            }
1134
1135            HydroNode::Unpersist { .. } => {
1136                panic!(
1137                    "Unpersist is a marker node and should have been optimized away. This is likely a compiler bug."
1138                )
1139            }
1140
1141            HydroNode::Delta { inner, .. } => {
1142                let (inner_ident, location) =
1143                    inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
1144
1145                let delta_ident =
1146                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1147
1148                match builders_or_callback {
1149                    BuildersOrCallback::Builders(graph_builders) => {
1150                        let builder = graph_builders.entry(location).or_default();
1151                        builder.add_dfir(
1152                            parse_quote! {
1153                                #delta_ident = #inner_ident -> multiset_delta();
1154                            },
1155                            None,
1156                            Some(&next_stmt_id.to_string()),
1157                        );
1158                    }
1159                    BuildersOrCallback::Callback(_, node_callback) => {
1160                        node_callback(self, next_stmt_id);
1161                    }
1162                }
1163
1164                *next_stmt_id += 1;
1165
1166                (delta_ident, location)
1167            }
1168
1169            HydroNode::Source {
1170                source,
1171                location_kind,
1172                ..
1173            } => {
1174                let location_id = match location_kind.clone() {
1175                    LocationId::Process(id) => id,
1176                    LocationId::Cluster(id) => id,
1177                    LocationId::Tick(_, _) => panic!(),
1178                    LocationId::ExternalProcess(id) => id,
1179                };
1180
1181                if let HydroSource::ExternalNetwork() = source {
1182                    (syn::Ident::new("DUMMY", Span::call_site()), location_id)
1183                } else {
1184                    let source_ident =
1185                        syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1186
1187                    let source_stmt = match source {
1188                        HydroSource::Stream(expr) => {
1189                            parse_quote! {
1190                                #source_ident = source_stream(#expr);
1191                            }
1192                        }
1193
1194                        HydroSource::ExternalNetwork() => {
1195                            unreachable!()
1196                        }
1197
1198                        HydroSource::Iter(expr) => {
1199                            parse_quote! {
1200                                #source_ident = source_iter(#expr);
1201                            }
1202                        }
1203
1204                        HydroSource::Spin() => {
1205                            parse_quote! {
1206                                #source_ident = spin();
1207                            }
1208                        }
1209                    };
1210
1211                    match builders_or_callback {
1212                        BuildersOrCallback::Builders(graph_builders) => {
1213                            let builder = graph_builders.entry(location_id).or_default();
1214                            builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
1215                        }
1216                        BuildersOrCallback::Callback(_, node_callback) => {
1217                            node_callback(self, next_stmt_id);
1218                        }
1219                    }
1220
1221                    *next_stmt_id += 1;
1222
1223                    (source_ident, location_id)
1224                }
1225            }
1226
1227            HydroNode::CycleSource {
1228                ident,
1229                location_kind,
1230                ..
1231            } => {
1232                let location_id = *match location_kind.root() {
1233                    LocationId::Process(id) => id,
1234                    LocationId::Cluster(id) => id,
1235                    LocationId::Tick(_, _) => panic!(),
1236                    LocationId::ExternalProcess(_) => panic!(),
1237                };
1238
1239                let ident = ident.clone();
1240
1241                match builders_or_callback {
1242                    BuildersOrCallback::Builders(_) => {}
1243                    BuildersOrCallback::Callback(_, node_callback) => {
1244                        node_callback(self, next_stmt_id);
1245                    }
1246                }
1247
1248                // consume a stmt id even though we did not emit anything so that we can instrument this
1249                *next_stmt_id += 1;
1250
1251                (ident, location_id)
1252            }
1253
1254            HydroNode::Tee { inner, .. } => {
1255                let (ret_ident, inner_location_id) = if let Some((teed_from, inner_location_id)) =
1256                    built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
1257                {
1258                    match builders_or_callback {
1259                        BuildersOrCallback::Builders(_) => {}
1260                        BuildersOrCallback::Callback(_, node_callback) => {
1261                            node_callback(self, next_stmt_id);
1262                        }
1263                    }
1264
1265                    (teed_from.clone(), *inner_location_id)
1266                } else {
1267                    let (inner_ident, inner_location_id) = inner.0.borrow_mut().emit_core(
1268                        builders_or_callback,
1269                        built_tees,
1270                        next_stmt_id,
1271                    );
1272
1273                    let tee_ident =
1274                        syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1275
1276                    built_tees.insert(
1277                        inner.0.as_ref() as *const RefCell<HydroNode>,
1278                        (tee_ident.clone(), inner_location_id),
1279                    );
1280
1281                    match builders_or_callback {
1282                        BuildersOrCallback::Builders(graph_builders) => {
1283                            let builder = graph_builders.entry(inner_location_id).or_default();
1284                            builder.add_dfir(
1285                                parse_quote! {
1286                                    #tee_ident = #inner_ident -> tee();
1287                                },
1288                                None,
1289                                Some(&next_stmt_id.to_string()),
1290                            );
1291                        }
1292                        BuildersOrCallback::Callback(_, node_callback) => {
1293                            node_callback(self, next_stmt_id);
1294                        }
1295                    }
1296
1297                    (tee_ident, inner_location_id)
1298                };
1299
1300                // we consume a stmt id regardless of if we emit the tee() operator,
1301                // so that during rewrites we touch all recipients of the tee()
1302
1303                *next_stmt_id += 1;
1304                (ret_ident, inner_location_id)
1305            }
1306
1307            HydroNode::Chain { first, second, .. } => {
1308                let (first_ident, first_location_id) =
1309                    first.emit_core(builders_or_callback, built_tees, next_stmt_id);
1310                let (second_ident, second_location_id) =
1311                    second.emit_core(builders_or_callback, built_tees, next_stmt_id);
1312
1313                assert_eq!(
1314                    first_location_id, second_location_id,
1315                    "chain inputs must be in the same location"
1316                );
1317
1318                let chain_ident =
1319                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1320
1321                match builders_or_callback {
1322                    BuildersOrCallback::Builders(graph_builders) => {
1323                        let builder = graph_builders.entry(first_location_id).or_default();
1324                        builder.add_dfir(
1325                            parse_quote! {
1326                                #chain_ident = chain();
1327                                #first_ident -> [0]#chain_ident;
1328                                #second_ident -> [1]#chain_ident;
1329                            },
1330                            None,
1331                            Some(&next_stmt_id.to_string()),
1332                        );
1333                    }
1334                    BuildersOrCallback::Callback(_, node_callback) => {
1335                        node_callback(self, next_stmt_id);
1336                    }
1337                }
1338
1339                *next_stmt_id += 1;
1340
1341                (chain_ident, first_location_id)
1342            }
1343
1344            HydroNode::CrossSingleton { left, right, .. } => {
1345                let (left_ident, left_location_id) =
1346                    left.emit_core(builders_or_callback, built_tees, next_stmt_id);
1347                let (right_ident, right_location_id) =
1348                    right.emit_core(builders_or_callback, built_tees, next_stmt_id);
1349
1350                assert_eq!(
1351                    left_location_id, right_location_id,
1352                    "cross_singleton inputs must be in the same location"
1353                );
1354
1355                let cross_ident =
1356                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1357
1358                match builders_or_callback {
1359                    BuildersOrCallback::Builders(graph_builders) => {
1360                        let builder = graph_builders.entry(left_location_id).or_default();
1361                        builder.add_dfir(
1362                            parse_quote! {
1363                                #cross_ident = cross_singleton();
1364                                #left_ident -> [input]#cross_ident;
1365                                #right_ident -> [single]#cross_ident;
1366                            },
1367                            None,
1368                            Some(&next_stmt_id.to_string()),
1369                        );
1370                    }
1371                    BuildersOrCallback::Callback(_, node_callback) => {
1372                        node_callback(self, next_stmt_id);
1373                    }
1374                }
1375
1376                *next_stmt_id += 1;
1377
1378                (cross_ident, left_location_id)
1379            }
1380
1381            HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
1382                let operator: syn::Ident = if matches!(self, HydroNode::CrossProduct { .. }) {
1383                    parse_quote!(cross_join_multiset)
1384                } else {
1385                    parse_quote!(join_multiset)
1386                };
1387
1388                let (HydroNode::CrossProduct { left, right, .. }
1389                | HydroNode::Join { left, right, .. }) = self
1390                else {
1391                    unreachable!()
1392                };
1393
1394                let (left_inner, left_lifetime) =
1395                    if let HydroNode::Persist { inner: left, .. } = left.as_mut() {
1396                        (left, quote!('static))
1397                    } else {
1398                        (left, quote!('tick))
1399                    };
1400
1401                let (right_inner, right_lifetime) =
1402                    if let HydroNode::Persist { inner: right, .. } = right.as_mut() {
1403                        (right, quote!('static))
1404                    } else {
1405                        (right, quote!('tick))
1406                    };
1407
1408                let (left_ident, left_location_id) =
1409                    left_inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
1410                let (right_ident, right_location_id) =
1411                    right_inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
1412
1413                assert_eq!(
1414                    left_location_id, right_location_id,
1415                    "join / cross product inputs must be in the same location"
1416                );
1417
1418                let stream_ident =
1419                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1420
1421                match builders_or_callback {
1422                    BuildersOrCallback::Builders(graph_builders) => {
1423                        let builder = graph_builders.entry(left_location_id).or_default();
1424                        builder.add_dfir(
1425                            parse_quote! {
1426                                #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
1427                                #left_ident -> [0]#stream_ident;
1428                                #right_ident -> [1]#stream_ident;
1429                            },
1430                            None,
1431                            Some(&next_stmt_id.to_string()),
1432                        );
1433                    }
1434                    BuildersOrCallback::Callback(_, node_callback) => {
1435                        node_callback(self, next_stmt_id);
1436                    }
1437                }
1438
1439                *next_stmt_id += 1;
1440
1441                (stream_ident, left_location_id)
1442            }
1443
1444            HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
1445                let operator: syn::Ident = if matches!(self, HydroNode::Difference { .. }) {
1446                    parse_quote!(difference_multiset)
1447                } else {
1448                    parse_quote!(anti_join_multiset)
1449                };
1450
1451                let (HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. }) =
1452                    self
1453                else {
1454                    unreachable!()
1455                };
1456
1457                let (neg, neg_lifetime) =
1458                    if let HydroNode::Persist { inner: neg, .. } = neg.as_mut() {
1459                        (neg, quote!('static))
1460                    } else {
1461                        (neg, quote!('tick))
1462                    };
1463
1464                let (pos_ident, pos_location_id) =
1465                    pos.emit_core(builders_or_callback, built_tees, next_stmt_id);
1466                let (neg_ident, neg_location_id) =
1467                    neg.emit_core(builders_or_callback, built_tees, next_stmt_id);
1468
1469                assert_eq!(
1470                    pos_location_id, neg_location_id,
1471                    "difference / anti join inputs must be in the same location"
1472                );
1473
1474                let stream_ident =
1475                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1476
1477                match builders_or_callback {
1478                    BuildersOrCallback::Builders(graph_builders) => {
1479                        let builder = graph_builders.entry(pos_location_id).or_default();
1480                        builder.add_dfir(
1481                            parse_quote! {
1482                                #stream_ident = #operator::<'tick, #neg_lifetime>();
1483                                #pos_ident -> [pos]#stream_ident;
1484                                #neg_ident -> [neg]#stream_ident;
1485                            },
1486                            None,
1487                            Some(&next_stmt_id.to_string()),
1488                        );
1489                    }
1490                    BuildersOrCallback::Callback(_, node_callback) => {
1491                        node_callback(self, next_stmt_id);
1492                    }
1493                }
1494
1495                *next_stmt_id += 1;
1496
1497                (stream_ident, pos_location_id)
1498            }
1499
1500            HydroNode::ResolveFutures { input, .. } => {
1501                let (input_ident, input_location_id) =
1502                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1503
1504                let futures_ident =
1505                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1506
1507                match builders_or_callback {
1508                    BuildersOrCallback::Builders(graph_builders) => {
1509                        let builder = graph_builders.entry(input_location_id).or_default();
1510                        builder.add_dfir(
1511                            parse_quote! {
1512                                #futures_ident = #input_ident -> resolve_futures();
1513                            },
1514                            None,
1515                            Some(&next_stmt_id.to_string()),
1516                        );
1517                    }
1518                    BuildersOrCallback::Callback(_, node_callback) => {
1519                        node_callback(self, next_stmt_id);
1520                    }
1521                }
1522
1523                *next_stmt_id += 1;
1524
1525                (futures_ident, input_location_id)
1526            }
1527
1528            HydroNode::ResolveFuturesOrdered { input, .. } => {
1529                let (input_ident, input_location_id) =
1530                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1531
1532                let futures_ident =
1533                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1534
1535                match builders_or_callback {
1536                    BuildersOrCallback::Builders(graph_builders) => {
1537                        let builder = graph_builders.entry(input_location_id).or_default();
1538                        builder.add_dfir(
1539                            parse_quote! {
1540                                #futures_ident = #input_ident -> resolve_futures_ordered();
1541                            },
1542                            None,
1543                            Some(&next_stmt_id.to_string()),
1544                        );
1545                    }
1546                    BuildersOrCallback::Callback(_, node_callback) => {
1547                        node_callback(self, next_stmt_id);
1548                    }
1549                }
1550
1551                *next_stmt_id += 1;
1552
1553                (futures_ident, input_location_id)
1554            }
1555
1556            HydroNode::Map { f, input, .. } => {
1557                let (input_ident, input_location_id) =
1558                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1559
1560                let map_ident =
1561                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1562
1563                match builders_or_callback {
1564                    BuildersOrCallback::Builders(graph_builders) => {
1565                        let builder = graph_builders.entry(input_location_id).or_default();
1566                        builder.add_dfir(
1567                            parse_quote! {
1568                                #map_ident = #input_ident -> map(#f);
1569                            },
1570                            None,
1571                            Some(&next_stmt_id.to_string()),
1572                        );
1573                    }
1574                    BuildersOrCallback::Callback(_, node_callback) => {
1575                        node_callback(self, next_stmt_id);
1576                    }
1577                }
1578
1579                *next_stmt_id += 1;
1580
1581                (map_ident, input_location_id)
1582            }
1583
1584            HydroNode::FlatMap { f, input, .. } => {
1585                let (input_ident, input_location_id) =
1586                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1587
1588                let flat_map_ident =
1589                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1590
1591                match builders_or_callback {
1592                    BuildersOrCallback::Builders(graph_builders) => {
1593                        let builder = graph_builders.entry(input_location_id).or_default();
1594                        builder.add_dfir(
1595                            parse_quote! {
1596                                #flat_map_ident = #input_ident -> flat_map(#f);
1597                            },
1598                            None,
1599                            Some(&next_stmt_id.to_string()),
1600                        );
1601                    }
1602                    BuildersOrCallback::Callback(_, node_callback) => {
1603                        node_callback(self, next_stmt_id);
1604                    }
1605                }
1606
1607                *next_stmt_id += 1;
1608
1609                (flat_map_ident, input_location_id)
1610            }
1611
1612            HydroNode::Filter { f, input, .. } => {
1613                let (input_ident, input_location_id) =
1614                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1615
1616                let filter_ident =
1617                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1618
1619                match builders_or_callback {
1620                    BuildersOrCallback::Builders(graph_builders) => {
1621                        let builder = graph_builders.entry(input_location_id).or_default();
1622                        builder.add_dfir(
1623                            parse_quote! {
1624                                #filter_ident = #input_ident -> filter(#f);
1625                            },
1626                            None,
1627                            Some(&next_stmt_id.to_string()),
1628                        );
1629                    }
1630                    BuildersOrCallback::Callback(_, node_callback) => {
1631                        node_callback(self, next_stmt_id);
1632                    }
1633                }
1634
1635                *next_stmt_id += 1;
1636
1637                (filter_ident, input_location_id)
1638            }
1639
1640            HydroNode::FilterMap { f, input, .. } => {
1641                let (input_ident, input_location_id) =
1642                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1643
1644                let filter_map_ident =
1645                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1646
1647                match builders_or_callback {
1648                    BuildersOrCallback::Builders(graph_builders) => {
1649                        let builder = graph_builders.entry(input_location_id).or_default();
1650                        builder.add_dfir(
1651                            parse_quote! {
1652                                #filter_map_ident = #input_ident -> filter_map(#f);
1653                            },
1654                            None,
1655                            Some(&next_stmt_id.to_string()),
1656                        );
1657                    }
1658                    BuildersOrCallback::Callback(_, node_callback) => {
1659                        node_callback(self, next_stmt_id);
1660                    }
1661                }
1662
1663                *next_stmt_id += 1;
1664
1665                (filter_map_ident, input_location_id)
1666            }
1667
1668            HydroNode::Sort { input, .. } => {
1669                let (input_ident, input_location_id) =
1670                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1671
1672                let sort_ident =
1673                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1674
1675                match builders_or_callback {
1676                    BuildersOrCallback::Builders(graph_builders) => {
1677                        let builder = graph_builders.entry(input_location_id).or_default();
1678                        builder.add_dfir(
1679                            parse_quote! {
1680                                #sort_ident = #input_ident -> sort();
1681                            },
1682                            None,
1683                            Some(&next_stmt_id.to_string()),
1684                        );
1685                    }
1686                    BuildersOrCallback::Callback(_, node_callback) => {
1687                        node_callback(self, next_stmt_id);
1688                    }
1689                }
1690
1691                *next_stmt_id += 1;
1692
1693                (sort_ident, input_location_id)
1694            }
1695
1696            HydroNode::DeferTick { input, .. } => {
1697                let (input_ident, input_location_id) =
1698                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1699
1700                let defer_tick_ident =
1701                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1702
1703                match builders_or_callback {
1704                    BuildersOrCallback::Builders(graph_builders) => {
1705                        let builder = graph_builders.entry(input_location_id).or_default();
1706                        builder.add_dfir(
1707                            parse_quote! {
1708                                #defer_tick_ident = #input_ident -> defer_tick_lazy();
1709                            },
1710                            None,
1711                            Some(&next_stmt_id.to_string()),
1712                        );
1713                    }
1714                    BuildersOrCallback::Callback(_, node_callback) => {
1715                        node_callback(self, next_stmt_id);
1716                    }
1717                }
1718
1719                *next_stmt_id += 1;
1720
1721                (defer_tick_ident, input_location_id)
1722            }
1723
1724            HydroNode::Enumerate {
1725                is_static, input, ..
1726            } => {
1727                let (input_ident, input_location_id) =
1728                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1729
1730                let enumerate_ident =
1731                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1732
1733                match builders_or_callback {
1734                    BuildersOrCallback::Builders(graph_builders) => {
1735                        let builder = graph_builders.entry(input_location_id).or_default();
1736                        let lifetime = if *is_static {
1737                            quote!('static)
1738                        } else {
1739                            quote!('tick)
1740                        };
1741                        builder.add_dfir(
1742                            parse_quote! {
1743                                #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
1744                            },
1745                            None,
1746                            Some(&next_stmt_id.to_string()),
1747                        );
1748                    }
1749                    BuildersOrCallback::Callback(_, node_callback) => {
1750                        node_callback(self, next_stmt_id);
1751                    }
1752                }
1753
1754                *next_stmt_id += 1;
1755
1756                (enumerate_ident, input_location_id)
1757            }
1758
1759            HydroNode::Inspect { f, input, .. } => {
1760                let (input_ident, input_location_id) =
1761                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1762
1763                let inspect_ident =
1764                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1765
1766                match builders_or_callback {
1767                    BuildersOrCallback::Builders(graph_builders) => {
1768                        let builder = graph_builders.entry(input_location_id).or_default();
1769                        builder.add_dfir(
1770                            parse_quote! {
1771                                #inspect_ident = #input_ident -> inspect(#f);
1772                            },
1773                            None,
1774                            Some(&next_stmt_id.to_string()),
1775                        );
1776                    }
1777                    BuildersOrCallback::Callback(_, node_callback) => {
1778                        node_callback(self, next_stmt_id);
1779                    }
1780                }
1781
1782                *next_stmt_id += 1;
1783
1784                (inspect_ident, input_location_id)
1785            }
1786
1787            HydroNode::Unique { input, .. } => {
1788                let (input_ident, input_location_id) =
1789                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1790
1791                let unique_ident =
1792                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1793
1794                match builders_or_callback {
1795                    BuildersOrCallback::Builders(graph_builders) => {
1796                        let builder = graph_builders.entry(input_location_id).or_default();
1797                        builder.add_dfir(
1798                            parse_quote! {
1799                                #unique_ident = #input_ident -> unique::<'tick>();
1800                            },
1801                            None,
1802                            Some(&next_stmt_id.to_string()),
1803                        );
1804                    }
1805                    BuildersOrCallback::Callback(_, node_callback) => {
1806                        node_callback(self, next_stmt_id);
1807                    }
1808                }
1809
1810                *next_stmt_id += 1;
1811
1812                (unique_ident, input_location_id)
1813            }
1814
1815            HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } => {
1816                let operator: syn::Ident = if matches!(self, HydroNode::Fold { .. }) {
1817                    parse_quote!(fold)
1818                } else {
1819                    parse_quote!(fold_keyed)
1820                };
1821
1822                let (HydroNode::Fold {
1823                    init, acc, input, ..
1824                }
1825                | HydroNode::FoldKeyed {
1826                    init, acc, input, ..
1827                }) = self
1828                else {
1829                    unreachable!()
1830                };
1831
1832                let (input, lifetime) =
1833                    if let HydroNode::Persist { inner: input, .. } = input.as_mut() {
1834                        (input, quote!('static))
1835                    } else {
1836                        (input, quote!('tick))
1837                    };
1838
1839                let (input_ident, input_location_id) =
1840                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1841
1842                let fold_ident =
1843                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1844
1845                match builders_or_callback {
1846                    BuildersOrCallback::Builders(graph_builders) => {
1847                        let builder = graph_builders.entry(input_location_id).or_default();
1848                        builder.add_dfir(
1849                            parse_quote! {
1850                                #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
1851                            },
1852                            None,
1853                            Some(&next_stmt_id.to_string()),
1854                        );
1855                    }
1856                    BuildersOrCallback::Callback(_, node_callback) => {
1857                        node_callback(self, next_stmt_id);
1858                    }
1859                }
1860
1861                *next_stmt_id += 1;
1862
1863                (fold_ident, input_location_id)
1864            }
1865
1866            HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
1867                let operator: syn::Ident = if matches!(self, HydroNode::Reduce { .. }) {
1868                    parse_quote!(reduce)
1869                } else {
1870                    parse_quote!(reduce_keyed)
1871                };
1872
1873                let (HydroNode::Reduce { f, input, .. } | HydroNode::ReduceKeyed { f, input, .. }) =
1874                    self
1875                else {
1876                    unreachable!()
1877                };
1878
1879                let (input, lifetime) =
1880                    if let HydroNode::Persist { inner: input, .. } = input.as_mut() {
1881                        (input, quote!('static))
1882                    } else {
1883                        (input, quote!('tick))
1884                    };
1885
1886                let (input_ident, input_location_id) =
1887                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1888
1889                let reduce_ident =
1890                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1891
1892                match builders_or_callback {
1893                    BuildersOrCallback::Builders(graph_builders) => {
1894                        let builder = graph_builders.entry(input_location_id).or_default();
1895                        builder.add_dfir(
1896                            parse_quote! {
1897                                #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
1898                            },
1899                            None,
1900                            Some(&next_stmt_id.to_string()),
1901                        );
1902                    }
1903                    BuildersOrCallback::Callback(_, node_callback) => {
1904                        node_callback(self, next_stmt_id);
1905                    }
1906                }
1907
1908                *next_stmt_id += 1;
1909
1910                (reduce_ident, input_location_id)
1911            }
1912
1913            HydroNode::Network {
1914                from_key: _,
1915                to_location,
1916                to_key: _,
1917                serialize_fn: serialize_pipeline,
1918                instantiate_fn,
1919                deserialize_fn: deserialize_pipeline,
1920                input,
1921                ..
1922            } => {
1923                let (input_ident, input_location_id) =
1924                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1925
1926                let to_id = match *to_location {
1927                    LocationId::Process(id) => id,
1928                    LocationId::Cluster(id) => id,
1929                    LocationId::Tick(_, _) => panic!(),
1930                    LocationId::ExternalProcess(id) => id,
1931                };
1932
1933                let receiver_stream_ident =
1934                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1935
1936                match builders_or_callback {
1937                    BuildersOrCallback::Builders(graph_builders) => {
1938                        let (sink_expr, source_expr) = match instantiate_fn {
1939                            DebugInstantiate::Building => (
1940                                syn::parse_quote!(DUMMY_SINK),
1941                                syn::parse_quote!(DUMMY_SOURCE),
1942                            ),
1943
1944                            DebugInstantiate::Finalized(sink, source, _connect_fn) => {
1945                                (sink.clone(), source.clone())
1946                            }
1947                        };
1948
1949                        let sender_builder = graph_builders.entry(input_location_id).or_default();
1950                        if let Some(serialize_pipeline) = serialize_pipeline {
1951                            sender_builder.add_dfir(
1952                                parse_quote! {
1953                                    #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink_expr);
1954                                },
1955                                None,
1956                                Some(&next_stmt_id.to_string()),
1957                            );
1958                        } else {
1959                            sender_builder.add_dfir(
1960                                parse_quote! {
1961                                    #input_ident -> dest_sink(#sink_expr);
1962                                },
1963                                None,
1964                                Some(&next_stmt_id.to_string()),
1965                            );
1966                        }
1967
1968                        let receiver_builder = graph_builders.entry(to_id).or_default();
1969                        if let Some(deserialize_pipeline) = deserialize_pipeline {
1970                            receiver_builder.add_dfir(parse_quote! {
1971                                #receiver_stream_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
1972                            }, None, Some(&next_stmt_id.to_string()));
1973                        } else {
1974                            receiver_builder.add_dfir(
1975                                parse_quote! {
1976                                    #receiver_stream_ident = source_stream(#source_expr);
1977                                },
1978                                None,
1979                                Some(&next_stmt_id.to_string()),
1980                            );
1981                        }
1982                    }
1983                    BuildersOrCallback::Callback(_, node_callback) => {
1984                        node_callback(self, next_stmt_id);
1985                    }
1986                }
1987
1988                *next_stmt_id += 1;
1989
1990                (receiver_stream_ident, to_id)
1991            }
1992
1993            HydroNode::Counter {
1994                tag,
1995                duration,
1996                input,
1997                ..
1998            } => {
1999                let (input_ident, input_location_id) =
2000                    input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2001
2002                let counter_ident =
2003                    syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2004
2005                match builders_or_callback {
2006                    BuildersOrCallback::Builders(graph_builders) => {
2007                        let builder = graph_builders.entry(input_location_id).or_default();
2008                        builder.add_dfir(
2009                            parse_quote! {
2010                                #counter_ident = #input_ident -> _counter(#tag, #duration);
2011                            },
2012                            None,
2013                            Some(&next_stmt_id.to_string()),
2014                        );
2015                    }
2016                    BuildersOrCallback::Callback(_, node_callback) => {
2017                        node_callback(self, next_stmt_id);
2018                    }
2019                }
2020
2021                *next_stmt_id += 1;
2022
2023                (counter_ident, input_location_id)
2024            }
2025        }
2026    }
2027
2028    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
2029        match self {
2030            HydroNode::Placeholder => {
2031                panic!()
2032            }
2033            HydroNode::Source { source, .. } => match source {
2034                HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
2035                HydroSource::ExternalNetwork() | HydroSource::Spin() => {}
2036            },
2037            HydroNode::CycleSource { .. }
2038            | HydroNode::Tee { .. }
2039            | HydroNode::Persist { .. }
2040            | HydroNode::Unpersist { .. }
2041            | HydroNode::Delta { .. }
2042            | HydroNode::Chain { .. }
2043            | HydroNode::CrossProduct { .. }
2044            | HydroNode::CrossSingleton { .. }
2045            | HydroNode::ResolveFutures { .. }
2046            | HydroNode::ResolveFuturesOrdered { .. }
2047            | HydroNode::Join { .. }
2048            | HydroNode::Difference { .. }
2049            | HydroNode::AntiJoin { .. }
2050            | HydroNode::DeferTick { .. }
2051            | HydroNode::Enumerate { .. }
2052            | HydroNode::Unique { .. }
2053            | HydroNode::Sort { .. } => {}
2054            HydroNode::Map { f, .. }
2055            | HydroNode::FlatMap { f, .. }
2056            | HydroNode::Filter { f, .. }
2057            | HydroNode::FilterMap { f, .. }
2058            | HydroNode::Inspect { f, .. }
2059            | HydroNode::Reduce { f, .. }
2060            | HydroNode::ReduceKeyed { f, .. } => {
2061                transform(f);
2062            }
2063            HydroNode::Fold { init, acc, .. } | HydroNode::FoldKeyed { init, acc, .. } => {
2064                transform(init);
2065                transform(acc);
2066            }
2067            HydroNode::Network {
2068                serialize_fn,
2069                deserialize_fn,
2070                ..
2071            } => {
2072                if let Some(serialize_fn) = serialize_fn {
2073                    transform(serialize_fn);
2074                }
2075                if let Some(deserialize_fn) = deserialize_fn {
2076                    transform(deserialize_fn);
2077                }
2078            }
2079            HydroNode::Counter { duration, .. } => {
2080                transform(duration);
2081            }
2082        }
2083    }
2084
2085    pub fn metadata(&self) -> &HydroIrMetadata {
2086        match self {
2087            HydroNode::Placeholder => {
2088                panic!()
2089            }
2090            HydroNode::Source { metadata, .. } => metadata,
2091            HydroNode::CycleSource { metadata, .. } => metadata,
2092            HydroNode::Tee { metadata, .. } => metadata,
2093            HydroNode::Persist { metadata, .. } => metadata,
2094            HydroNode::Unpersist { metadata, .. } => metadata,
2095            HydroNode::Delta { metadata, .. } => metadata,
2096            HydroNode::Chain { metadata, .. } => metadata,
2097            HydroNode::CrossProduct { metadata, .. } => metadata,
2098            HydroNode::CrossSingleton { metadata, .. } => metadata,
2099            HydroNode::Join { metadata, .. } => metadata,
2100            HydroNode::Difference { metadata, .. } => metadata,
2101            HydroNode::AntiJoin { metadata, .. } => metadata,
2102            HydroNode::ResolveFutures { metadata, .. } => metadata,
2103            HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
2104            HydroNode::Map { metadata, .. } => metadata,
2105            HydroNode::FlatMap { metadata, .. } => metadata,
2106            HydroNode::Filter { metadata, .. } => metadata,
2107            HydroNode::FilterMap { metadata, .. } => metadata,
2108            HydroNode::DeferTick { metadata, .. } => metadata,
2109            HydroNode::Enumerate { metadata, .. } => metadata,
2110            HydroNode::Inspect { metadata, .. } => metadata,
2111            HydroNode::Unique { metadata, .. } => metadata,
2112            HydroNode::Sort { metadata, .. } => metadata,
2113            HydroNode::Fold { metadata, .. } => metadata,
2114            HydroNode::FoldKeyed { metadata, .. } => metadata,
2115            HydroNode::Reduce { metadata, .. } => metadata,
2116            HydroNode::ReduceKeyed { metadata, .. } => metadata,
2117            HydroNode::Network { metadata, .. } => metadata,
2118            HydroNode::Counter { metadata, .. } => metadata,
2119        }
2120    }
2121
2122    pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
2123        match self {
2124            HydroNode::Placeholder => {
2125                panic!()
2126            }
2127            HydroNode::Source { metadata, .. } => metadata,
2128            HydroNode::CycleSource { metadata, .. } => metadata,
2129            HydroNode::Tee { metadata, .. } => metadata,
2130            HydroNode::Persist { metadata, .. } => metadata,
2131            HydroNode::Unpersist { metadata, .. } => metadata,
2132            HydroNode::Delta { metadata, .. } => metadata,
2133            HydroNode::Chain { metadata, .. } => metadata,
2134            HydroNode::CrossProduct { metadata, .. } => metadata,
2135            HydroNode::CrossSingleton { metadata, .. } => metadata,
2136            HydroNode::Join { metadata, .. } => metadata,
2137            HydroNode::Difference { metadata, .. } => metadata,
2138            HydroNode::AntiJoin { metadata, .. } => metadata,
2139            HydroNode::ResolveFutures { metadata, .. } => metadata,
2140            HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
2141            HydroNode::Map { metadata, .. } => metadata,
2142            HydroNode::FlatMap { metadata, .. } => metadata,
2143            HydroNode::Filter { metadata, .. } => metadata,
2144            HydroNode::FilterMap { metadata, .. } => metadata,
2145            HydroNode::DeferTick { metadata, .. } => metadata,
2146            HydroNode::Enumerate { metadata, .. } => metadata,
2147            HydroNode::Inspect { metadata, .. } => metadata,
2148            HydroNode::Unique { metadata, .. } => metadata,
2149            HydroNode::Sort { metadata, .. } => metadata,
2150            HydroNode::Fold { metadata, .. } => metadata,
2151            HydroNode::FoldKeyed { metadata, .. } => metadata,
2152            HydroNode::Reduce { metadata, .. } => metadata,
2153            HydroNode::ReduceKeyed { metadata, .. } => metadata,
2154            HydroNode::Network { metadata, .. } => metadata,
2155            HydroNode::Counter { metadata, .. } => metadata,
2156        }
2157    }
2158
2159    pub fn print_root(&self) -> String {
2160        match self {
2161            HydroNode::Placeholder => {
2162                panic!()
2163            }
2164            HydroNode::Source { source, .. } => format!("Source({:?})", source),
2165            HydroNode::CycleSource { ident, .. } => format!("CycleSource({})", ident),
2166            HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
2167            HydroNode::Persist { .. } => "Persist()".to_string(),
2168            HydroNode::Unpersist { .. } => "Unpersist()".to_string(),
2169            HydroNode::Delta { .. } => "Delta()".to_string(),
2170            HydroNode::Chain { first, second, .. } => {
2171                format!("Chain({}, {})", first.print_root(), second.print_root())
2172            }
2173            HydroNode::CrossProduct { left, right, .. } => {
2174                format!(
2175                    "CrossProduct({}, {})",
2176                    left.print_root(),
2177                    right.print_root()
2178                )
2179            }
2180            HydroNode::CrossSingleton { left, right, .. } => {
2181                format!(
2182                    "CrossSingleton({}, {})",
2183                    left.print_root(),
2184                    right.print_root()
2185                )
2186            }
2187            HydroNode::Join { left, right, .. } => {
2188                format!("Join({}, {})", left.print_root(), right.print_root())
2189            }
2190            HydroNode::Difference { pos, neg, .. } => {
2191                format!("Difference({}, {})", pos.print_root(), neg.print_root())
2192            }
2193            HydroNode::AntiJoin { pos, neg, .. } => {
2194                format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
2195            }
2196            HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_string(),
2197            HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_string(),
2198            HydroNode::Map { f, .. } => format!("Map({:?})", f),
2199            HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
2200            HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
2201            HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
2202            HydroNode::DeferTick { .. } => "DeferTick()".to_string(),
2203            HydroNode::Enumerate { is_static, .. } => format!("Enumerate({:?})", is_static),
2204            HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
2205            HydroNode::Unique { .. } => "Unique()".to_string(),
2206            HydroNode::Sort { .. } => "Sort()".to_string(),
2207            HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
2208            HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
2209            HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
2210            HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
2211            HydroNode::Network { to_location, .. } => format!("Network(to {:?})", to_location),
2212            HydroNode::Counter { tag, duration, .. } => {
2213                format!("Counter({:?}, {:?})", tag, duration)
2214            }
2215        }
2216    }
2217}
2218
2219#[cfg(feature = "build")]
2220#[expect(clippy::too_many_arguments, reason = "networking internals")]
2221fn instantiate_network<'a, D: Deploy<'a>>(
2222    from_location: &LocationId,
2223    from_key: Option<usize>,
2224    to_location: &LocationId,
2225    to_key: Option<usize>,
2226    nodes: &HashMap<usize, D::Process>,
2227    clusters: &HashMap<usize, D::Cluster>,
2228    externals: &HashMap<usize, D::ExternalProcess>,
2229    compile_env: &D::CompileEnv,
2230) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>) {
2231    let ((sink, source), connect_fn) = match (from_location, to_location) {
2232        (LocationId::Process(from), LocationId::Process(to)) => {
2233            let from_node = nodes
2234                .get(from)
2235                .unwrap_or_else(|| {
2236                    panic!("A process used in the graph was not instantiated: {}", from)
2237                })
2238                .clone();
2239            let to_node = nodes
2240                .get(to)
2241                .unwrap_or_else(|| {
2242                    panic!("A process used in the graph was not instantiated: {}", to)
2243                })
2244                .clone();
2245
2246            let sink_port = D::allocate_process_port(&from_node);
2247            let source_port = D::allocate_process_port(&to_node);
2248
2249            (
2250                D::o2o_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
2251                D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
2252            )
2253        }
2254        (LocationId::Process(from), LocationId::Cluster(to)) => {
2255            let from_node = nodes
2256                .get(from)
2257                .unwrap_or_else(|| {
2258                    panic!("A process used in the graph was not instantiated: {}", from)
2259                })
2260                .clone();
2261            let to_node = clusters
2262                .get(to)
2263                .unwrap_or_else(|| {
2264                    panic!("A cluster used in the graph was not instantiated: {}", to)
2265                })
2266                .clone();
2267
2268            let sink_port = D::allocate_process_port(&from_node);
2269            let source_port = D::allocate_cluster_port(&to_node);
2270
2271            (
2272                D::o2m_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
2273                D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
2274            )
2275        }
2276        (LocationId::Cluster(from), LocationId::Process(to)) => {
2277            let from_node = clusters
2278                .get(from)
2279                .unwrap_or_else(|| {
2280                    panic!("A cluster used in the graph was not instantiated: {}", from)
2281                })
2282                .clone();
2283            let to_node = nodes
2284                .get(to)
2285                .unwrap_or_else(|| {
2286                    panic!("A process used in the graph was not instantiated: {}", to)
2287                })
2288                .clone();
2289
2290            let sink_port = D::allocate_cluster_port(&from_node);
2291            let source_port = D::allocate_process_port(&to_node);
2292
2293            (
2294                D::m2o_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
2295                D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
2296            )
2297        }
2298        (LocationId::Cluster(from), LocationId::Cluster(to)) => {
2299            let from_node = clusters
2300                .get(from)
2301                .unwrap_or_else(|| {
2302                    panic!("A cluster used in the graph was not instantiated: {}", from)
2303                })
2304                .clone();
2305            let to_node = clusters
2306                .get(to)
2307                .unwrap_or_else(|| {
2308                    panic!("A cluster used in the graph was not instantiated: {}", to)
2309                })
2310                .clone();
2311
2312            let sink_port = D::allocate_cluster_port(&from_node);
2313            let source_port = D::allocate_cluster_port(&to_node);
2314
2315            (
2316                D::m2m_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
2317                D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
2318            )
2319        }
2320        (LocationId::ExternalProcess(from), LocationId::Process(to)) => {
2321            let from_node = externals
2322                .get(from)
2323                .unwrap_or_else(|| {
2324                    panic!(
2325                        "A external used in the graph was not instantiated: {}",
2326                        from
2327                    )
2328                })
2329                .clone();
2330
2331            let to_node = nodes
2332                .get(to)
2333                .unwrap_or_else(|| {
2334                    panic!("A process used in the graph was not instantiated: {}", to)
2335                })
2336                .clone();
2337
2338            let sink_port = D::allocate_external_port(&from_node);
2339            let source_port = D::allocate_process_port(&to_node);
2340
2341            from_node.register(from_key.unwrap(), sink_port.clone());
2342
2343            (
2344                (
2345                    parse_quote!(DUMMY),
2346                    D::e2o_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
2347                ),
2348                D::e2o_connect(&from_node, &sink_port, &to_node, &source_port),
2349            )
2350        }
2351        (LocationId::ExternalProcess(_from), LocationId::Cluster(_to)) => {
2352            todo!("NYI")
2353        }
2354        (LocationId::ExternalProcess(_), LocationId::ExternalProcess(_)) => {
2355            panic!("Cannot send from external to external")
2356        }
2357        (LocationId::Process(from), LocationId::ExternalProcess(to)) => {
2358            let from_node = nodes
2359                .get(from)
2360                .unwrap_or_else(|| {
2361                    panic!("A process used in the graph was not instantiated: {}", from)
2362                })
2363                .clone();
2364
2365            let to_node = externals
2366                .get(to)
2367                .unwrap_or_else(|| {
2368                    panic!("A external used in the graph was not instantiated: {}", to)
2369                })
2370                .clone();
2371
2372            let sink_port = D::allocate_process_port(&from_node);
2373            let source_port = D::allocate_external_port(&to_node);
2374
2375            to_node.register(to_key.unwrap(), source_port.clone());
2376
2377            (
2378                (
2379                    D::o2e_sink(compile_env, &from_node, &sink_port, &to_node, &source_port),
2380                    parse_quote!(DUMMY),
2381                ),
2382                D::o2e_connect(&from_node, &sink_port, &to_node, &source_port),
2383            )
2384        }
2385        (LocationId::Cluster(_from), LocationId::ExternalProcess(_to)) => {
2386            todo!("NYI")
2387        }
2388        (LocationId::Tick(_, _), _) => panic!(),
2389        (_, LocationId::Tick(_, _)) => panic!(),
2390    };
2391    (sink, source, connect_fn)
2392}