ModulesDataflowControl flowAllmatter
This guide outlines the key aspects of building Module nodes that process nested graphs, considering the available Input and Output nodes.
This guide exclusively relies on Dataflow approach to streamline code comprehension. Once you have gone through this guide and the example provided, you should be able to implement Control flow using the instructions in the Control flow guide.
The core idea behind modules is to create distinct graphs featuring Input and Output nodes. The next step is to create a dedicated Module node that reflects the ports based on the nodes of the relevant type specified in the graph.
To begin, let's create a node that will serve as our input point:
export class InputNode extends ClassicPreset.Node {
public value = null;
constructor(public key: string) {
super("Input");
this.addOutput("value", new ClassicPreset.Output(socket, "Number"));
}
data() {
return { value: this.value };
}
}
The user-defined key
is crucial to associate it with the input port of Module node. Also, we need to specify value
property to inject the input data.
In order for a module to have any use, an Output node is necessary
export class OutputNode extends ClassicPreset.Node {
constructor(public key: string) {
super("Output");
this.addInput("value", new ClassicPreset.Input(socket, "Number"));
}
data() {
return {};
}
}
In this instance, data
method returns an empty object as the input data can be obtained through the fetchInputs
method without the node's execution being necessary.
The Module node, which serves as a portal into a nested graph and displays input and output ports, is the most complex node. Let's look at a simplified example:
export class ModuleNode {
module: null | Module<Schemes> = null;
constructor(path: string) {
super("Module");
this.setModule(path);
}
public async setModule(path: string) {
this.module = findModule(path);
await removeNodeConnections(this.id);
if (this.module) {
const { inputs, outputs } = this.module.getPorts();
syncPorts(this, inputs, outputs);
} else {
syncPorts(this, [], []);
}
}
async data(inputs: Record<string, any>) {
const data = await this.module?.exec(inputs);
return data || {};
}
}
where
findModule
function returns an object representing a module, allowing access to its ports for display and the execution of a nested graphsyncPorts
updates input and output ports by removing outdated ones and adding new onesremoveNodeConnections
function deletes all connections, allowing us to remove ports if we need to switch modulesKeep in mind that making any dynamic changes to nodes, as seen in this example with syncPorts
, requires calling area.update('node', node.id)
.
To prevent conflicting calls from multiple Module nodes using the same nested graph, make sure to initialize a new editor and engine within the module.exec
method.
Here's a simplified example of how a nested graph processor can be implemented:
function findModule(path: string) {
return {
getPorts() {
const editor = new NodeEditor<Schemes>();
await importGraphByPath(path, editor);
const nodes = editor.getNodes();
const inputs = nodes
.filter((n): n is InputNode => n instanceof InputNode)
.map((n) => n.key);
const outputs = nodes
.filter((n): n is OutputNode => n instanceof OutputNode)
.map((n) => n.key);
return {
inputs,
outputs
};
},
exec: async (inputData: Record<string, any>) => {
const engine = new DataflowEngine<Schemes>();
const editor = new NodeEditor<Schemes>();
editor.use(engine);
await importGraphByPath(path, editor);
const nodes = editor.getNodes();
injectInputs(nodes, inputData);
return retrieveOutputs(nodes, engine);
}
};
where
getPorts
retrieves the keys for both Input and Output nodes and returns themimportGraphByPath
is supposed to load the essential nodes and connections for your module into the editorEach call creates a new instance of the editor to avoid conflicts when processing the graph.
The following method involves injecting the input data of the Module node into the Input nodes of the nested graph.
function injectInputs(nodes: Schemes["Node"][], inputData: Record<string, any>) {
const inputNodes = nodes.filter(node => node instanceof InputNode);
inputNodes.forEach((node) => {
// keep in mind that there may be no input connections, and we assume there's a maximum of one connection possible
node.value = inputData[node.key] && inputData[node.key][0];
});
}
Once the input data has been injected, the next step is to retrieve the output from the nodes:
async function retrieveOutputs(nodes: Schemes["Node"][], engine: DataflowEngine<Schemes>) {
const outputNodes = nodes.filter(node => node instanceof OutputNode);
// can be processed concurrently
const outputs = await Promise.all(outputNodes.map(async node => {
const data = await engine.fetchInputs(node.id);
// we consider only the data from the first connection as there can be at most one input connection
return [node.key, data.value[0]] as const;
}));
return Object.fromEntries(outputs);
}
Check out the complete result on the Modules example page. Additionally, this approach is implemented in Allmatter application.