Plumb tutorial

In this document a fairly complex plumb script will be built step by step. Each new improvement will introduce a new aspect of plumb scripting.

0. Introduction

The most common use case for plumb is to simplify the main loop of different cooperating software by removing the requirement of async read/write (select(2)/poll(2)). Instead of a single monolithic application that tries to do all the tasks, it is preferred to split the task up into multiple small tools, each doing one task. This works fine in shell as long as the data flow is linear, from left to right, as represented in pipelines. In complex setups such a linear pipeline is not enough - a graph of piping is required, sometimes with multiple loops.

The two main features in plumb that help solving the problem are:

The example in this document will demonstrate how to build a script that implements a simple control loop in a simulated environment. Both the controller and the simulator implement a simple, blocking main loop.

The simulator (fan.c, below) simulates a thermal system with a heat producer, a heat sink, a controllable fan and a sensor that measures the temperature of the sink. The purpose of the control loop is to keep the temperature around a predefined target value. The simulation will be the same for all examples.

Example: fan.c

#include 
#include 

/* target: 60 C
 load   fan
 0.651  1
*/

double air  = 25.0;        /* [C] air temp */
double temp = 25.0;        /* [C] starting temperature is room temperature */
double load = 0.2;         /* load percent generatinmg the heat */
double fan_rpm = 0.0;      /* current rpm of the fan in percent */
double fan_rpm_min = 0.2;  /* minimum fan rpm, if non-zero; anything between zero and this value will become this value */

double cm_sink = 40.0;
double cm_air  = 5.0;

void sim(void)
{
	double rpm, Q_out, Q_in;
	/* the device is heating heat sink with Q_in energy */
	Q_in = load * 1000.0;
	temp += Q_in / (cm_sink);

	/* calculate Q_out depending on fan rpm using a nonlinear fan function */
	rpm = fan_rpm;
	if ((rpm < fan_rpm_min) && (rpm != 0))
		rpm = fan_rpm_min;
	else if (rpm > 1.0)
		rpm = 1.0;
	Q_out = (log(rpm*3.0+1.0)+0.2);
//printf("rpm=%f Qin=%f Qout=%f\n", rpm, Q_in, Q_out);
	temp -= Q_out / (cm_air) * (temp - air);
	if (temp < air)
		temp = air;
}

int main(int argc, char *argv[])
{
	float f;
	unsigned long int last_t, t;
	char line[1024];


	last_t = time(NULL) - 2; /* make sure we have some iterations to run after the first control arrives */
	while(!(feof(stdin))) {
		/* read line by line */
		*line = 0;
		fgets(line, sizeof(line), stdin);

		/* float means control, anything else is just tick for the timer */
		if (sscanf(line, "%f", &f) == 1)
			fan_rpm = f;


		/* run as many iterations as many seconds passed after the last invocation */
		for(t = time(NULL); last_t < t; last_t++) {
			load += (double)(rand() % 2000 - 1000) / 10000.0;
			if (load < 0.0)
				load = 0.0;
			if (load > 1.0)
				load = 1.0;

			sim();
			printf("%f  %f %f\n", temp, load, fan_rpm);
			fflush(stdout);
		}
	}
}

The controller is a script written in awk. It will evolve with the plumb script throughout the tutorial.

When running the tests, output will have 3 columns of numbers. First column is the current temperature, second is the incoming heat (0..1), third is the fan control (0..1).

1. "69" setup

First, a simple loop has to be made: the simulator reads fan control on stdin and writes sensor data (current temperature) on stdout. The control script writes fan control requests to stdout and reads temperature on stdin:

Example: example1.awk

# feed in something to get the loop started - turn off fan and see what happens
BEGIN { print "0.0"; fflush(); }

# got results, do the control
{
	print $0 > "/dev/stderr"
	temp=$1

# calculate control output, between 0 and 1
	ctrl = (temp - target) / 12.0
	if (ctrl < 0)
		ctrl = 0
	if (ctrl > 1)
		ctrl = 1

# print control output
	print ctrl
	fflush()

# also feed the simulator with ticks
	system("sleep 1")
	print "tick"
	fflush()
}

The plumb script that runs the two processes together:

Example: example1.pb

# We have a hardware simulator process:
sim={"./fan"}

# The following awk script is a very simple controller
control={gawk -v "target=60"  -f example1.awk }

# two pipes, for the traditional '69' setup:
sim:1 | control:0
control:1 | sim:0

# to see that it's working, redirect control's stderr to plumb's output:
control:2 | env:1

The full syntax can be found in the manual plumb(5). A brief extract for understanding the script: creating a real process is name={cmd args}, pipelines work like in shell, referencing file descriptors of already created processes is name:num, default file descriptor assignment is same as in shell (0=stdin, 1=stdout, 2=stderr). However, there is no default stdio binding, which means none of the fds of a process will be redirected implicitly, not even stderr, the plumb script always must be explicit about all fds. A virtual process created and named "env" automatically on startup - binding any fd of env will bind the given fd of plumb (env:0 is the stdin of plumb).


On the above plumbviz drawing, all grey boxes are implicit objects created by plumb's stdscript on startup. The example script is in the box called "-f example1.pb". Legend of the drawing:


#legend

The script names processes (sim and control); besides name is required for fd references (which is essential for piping), it is a good practice to name most processes for readability of the script and the drawing. The drawing is generated using plumbviz(1).

There is a trick built in to fan.c for avoiding deadlocks. The loop depends on data circulating. If both the control script and the simulator sits there waiting for the first input to calculate the output, the loop will not start. As a quick fix, fan.c will simulate two iterations on startup, which will cause it to emit two lines of output that starts the loop. This will introduce a fixed delay of 2 iterations in the control loop, because by the time the control script reads a sensor data, the simulation is already 2 iterations in the future (2 iterations of sensor data waiting in a buffer).

The simulation runs as fast as input flows and the control loop immediately responds to any sensor output. Without artificial delays, the whole simulation will run on the maximum speed, taking up a lot of resources of the host system. To get the simulation "real-time", there is an artificial delay built in: the awk script waits a second before generating the next output.

In example 3 a more elegant timing is demonstrated.

Finally, the standard error output of the control process is redirected to the standard output of plumb. This will make the whole process visible to the user.

2. compactness

It is possible to create a new task in the middle of a pipeline. This saves some typing, while does not ruin readability, as long as the script highlights main data flows using pipelines. Even loop-back work from the end of the pipeline to the front: the only requirement for referencing a process is that the parser should have already seen it.

Example: example1.pb

# We have a hardware simulator process:
sim={"./fan"}

# The following awk script is a very simple controller
control={gawk -v "target=60"  -f example1.awk }

# two pipes, for the traditional '69' setup:
sim:1 | control:0
control:1 | sim:0

# to see that it's working, redirect control's stderr to plumb's output:
control:2 | env:1


3. timer

In the previous two examples timing was done by the awk script. This has multiple drawbacks. In this step a timer is introduced to provide a fixed time base for the control loop. With the previous setup time base was not precise: the time between two iterations was a sum of the time sleep spent idling and the cost of calling sleep, awk processing input and awk generating output. On the long run the small, unpredictable amount of overhead will accumulate.

An alternative solution is to let plumb do the timing by:

Since the timer runs independently of the control loop processes and will generate the output that will be delivered to fan.c even if both processes are blocking on read. Therefor this setup also solves the startup dead-lock problem.

Example: example3.pb

# We have a hardware simulator process and a controller:
sim=[hub] | {"./fan"} | control={gawk -v "target=60"  -f example3.awk } | sim:*

# timer ticks for running the sim
time_base=[timer period=0.5 repeat=0] | sim:*

# to see that it's working, redirect control's stderr to plumb's output:
control:2 | env:1


The awk control script got shorter: timing and loop workaround are thrown out:

Example: example3.awk

# whenever we got results, do the control
{
	print $0 > "/dev/stderr"
	temp=$1

# calculate control output, between 0 and 1
	ctrl = (temp - target) / 12.0
	if (ctrl < 0)
		ctrl = 0
	if (ctrl > 1)
		ctrl = 1

# print control output
	print ctrl
	fflush()
}

For referencing file descriptors of the [hub], sim:* is used. This syntax is common with [hub]s, "*" means "take the first free file descriptor". This feature is useful with any virtual process that does not use dedicated file descriptors, such as a [hub]. Not having dedicated file descriptor assignment means there's no stdio and no convention for fd numbers - any file descriptor can be either input or output (and the direction is obvious from the script). NOTE: for plumb, stdio does not exist, it's only a convention real processes and most virtual processes and the user follow.

The rule for merging the streams at the [hub] is simple: whenever a record is read on a sink fd (input), it is copied to all source fds (output). As long as the control script writes a full line at once and it is flushed, and the timer writes a full line at once, the [hub] will get records which are each a text line. It is guaranteed that hub will not mix or merge content of different records, so on the output there will be intact text lines.

NOTE: this implementation is not yet line-safe; the next step will fix that.

4. line splitting and stdio

Example 3 makes assumptions about the control process and input buffering of plumb: the control process flushes after each write and the amount of bytes written is small and the delay between two writes is so big that the whole line should arrive as a single record (single read(2)). Such assumptions tend to break under extreme conditions or as the script and traffic on the pipes grow.

Plumb does not do implicit line spitting/buffering, but it is easy to do explicit line splitting. There are two virtual processes that can should be used in series, the first breaking the stream into one line per record (removing the separators), the second appending a single newline at the end of each record:

	 [split "\n\r"] | [affix suffix="\n"]
	
Since this is a feature used very often, there is a shorthand for this pipeline, called $LSP in library stdio. The first line of the new script includes the stdio library.

Example: example4.pb

include stdio

# We have a hardware simulator process and a controller:
sim=[hub] | {"./fan"} | control={gawk -v "target=60"  -f example3.awk } | $LSP | sim:*

# timer ticks for running the sim
time_base=[timer period=0.5 repeat=0] | sim:*

# to see that it's working, redirect control's stderr to plumb's output:
control:2 | stdout:*



Besides providing $LSP, stdio also hooks env:0, env:1 and env:2, connecting [hub]s to them, allowing multiple processes to read or write the stdio of plumb.

The only other change to the plumb script is the inserted $LSP at the output of the control script. Note: because of the lack of implicit line splitting, the traffic between fan and control is unfiltered. Control is an awk script and awk has its own input line splitting. Not having a line split in between the two processes saves some CPU and makes the data flow more smooth.

5. line prefixing

It's common implementation with plumb that scripts prefix output lines to address different processes and read prefixed input lines using the prefix to identify the sender. This way one input (stdin) and one output (stdout) is enough for all processes. (The alternative is using multiple, higher file descriptors, which is also supported.)

Example: example5.pb

include stdio

# We have a hardware simulator process and a controller:
sim=[hub] | {"./fan"} | {gawk -v "target=60"  -f example5.awk } | $LSP | control=[hub]

# timer ticks for running the sim
time_base=[timer period=0.5 repeat=0] | sim:*

# loopback pipe
control:* | cm=[regex pattern="^ctrl " subst=""] | sim:*

# to see that it's working, redirect control's "log" to plumb's output:
control:* | lm=[regex pattern="^log "] | stdout:*





In this example the previous script is modified to read prefixed lines from the control script. When the script wants to send a line to the simulation, it prefixes the line with "ctrl ", when it wants to put a line in the log (to stdout of plumb), the prefix is "log ". Since line splitting happens at the output, the new [hub] (called control) will output text lines. This stream is fed in to two other pipelines, each regex filtering for a specific prefix. The first also removes the prefix (substituting it with empty string): looping back control commands to the fan sim shouldn't have anything else but numbers. The second pipe demonstrates how to keep the prefix, doing a match but not substituting and dumps the stream on plumb's stdout as-is.

These two pipelines are running in parallel; the one that alters the stream does so on a copy, so they don't interfere. The one that does not alter he stream but only matches does not copy or buffer any data.

The only modification to the awk script since example 3 is that the output lines are prefixed:

Example: example5.awk

# whenever we got results, do the control
{
	print "log " $0
	temp=$1

# calculate control output, between 0 and 1
	ctrl = (temp - target) / 12.0
	if (ctrl < 0)
		ctrl = 0
	if (ctrl > 1)
		ctrl = 1

# print control output
	print "ctrl " ctrl
	fflush()
}

6. CLI with line prefixing

In the following example the same line prefixing idea is employed on the input side of the script. Lines coming from the fan simulator are prefixed with "sim ", lines from stdin are prefixed with "CLI ".

Example: example6.pb

include stdio

# We have a hardware simulator process and a controller:
sim_in=[hub] | {"./fan"} | sim_out=[hub] | $LSP | [affix prefix="sim "] \
  | control_in=[hub] | {gawk -v "target=60"  -f example6.awk } \
  | $LSP | control_out=[hub]

# timer ticks for running the sim
time_base=[timer period=0.5 repeat=0] | sim_in:*

# loopback pipe
control_out:* | cm=[regex pattern="^ctrl " subst="" pass=all] | sim_in:*

# to see that it's working, redirect control's "log" to plumb's output:
control_out:* | lm=[regex pattern="^log "] | stdout:*

# minimalistic cli: prefix and redirect plumb's stdin to the controller
stdin:* | $LSP | Cm=[affix prefix="CLI "] | control_in:*


The awk script is modified to process both input types:

Example: example6.awk

# whenever we got results, do the control
/^sim / {
	print "log " $0
	temp=$2

# calculate control output, between 0 and 1
	ctrl = (temp - target) / 12.0
	if (ctrl < 0)
		ctrl = 0
	if (ctrl > 1)
		ctrl = 1

# print control output
	print "ctrl " ctrl
	fflush()
	next
}

# change target from CLI
/^CLI target / {
	target = $3
	print "log new target temp: " target
	next
}

# catch-all rules
/^CLI / {
	print "log ERROR: unknown command on CLI: " $0
	fflush()
	next
}

{
	print "log ERROR: unknown input prefix: " $0
	fflush()
}

The only command the script takes is the target num command, where num is a floating point number, the new target temperature.

Note: stdin:* needs line splitting since plumb doesn't handle input in any special way, it's just a stream of records of random size read(2) returns.

7. logging

Besides logging to stdout, it is useful to have the log in a file as well. In this version a [hub] is inserted after the log filter and an output of the [hub] is also saved in a file created using [open]. Virtual process [open] follows the stdio convention: the log file will be open for writing (O_WRONLY) since only stdin of [open] is used. Other combinations (using stdout or using both stdin/stdout) also work as expected.

Example: example7.pb

include stdio

#configuration:
logfile="example7.log"

# We have a hardware simulator process and a controller:
sim_in=[hub] | {"./fan"} | sim_out=[hub] | $LSP | [affix prefix="sim "] \
  | control_in=[hub] | {gawk -v "target=60"  -f example6.awk } \
  | $LSP | control_out=[hub]

# timer ticks for running the sim
time_base=[timer period=0.5 repeat=0] | sim_in:*

# loopback pipe
control_out:* | cm=[regex pattern="^ctrl " subst="" pass=all] | sim_in:*

# to see that it's working, redirect control's "log" to plumb's output and to 
# a log file:
control_out:* | lm=[regex pattern="^log "] | loglines=[hub] | stdout:*
loglines:* | [open ${logfile}]


# minimalistic cli: prefix and redirect plumb's stdin to the controller
stdin:* | $LSP | Cm=[affix prefix="CLI "] | control_in:*


The file name is not hardwired at [open] but is stored in a variable. The ${} variable substitution is chosen to ensure the file name is not interpreted - it may contain whitespace or even a full plumb script.

It is also possible, and is a good practice for larger scripts to have such settings in a separate file which is then included by the script using the include statement.

8. tee: script reuse

What example7.pb did with the logfile is a generic idea that has been around in UNIX for a while, called tee(1). Tee should be transparent on stdio, copying input to output; in the same time input should be copied to a file as well. In plumb script, as demonstrated in example 7, this is a [hub] with 2 outputs and an [open] attached to one of the outputs.

The script could be much more readable and somewhat shorter (if tee is used often) by having a generic implementation that can be inserted in any pipeline with a file name argument.

The following example demonstrates how to implement a subscript that can be easily reused with virtual process [new].

Example: example8.pb

include stdio

#configuration:
logfile="example8.log"

#implement a generic tee subscript
tee='
	env:0 | h=[hub] | env:1
	h:* | [open ${name}]
'

# We have a hardware simulator process and a controller:
sim_in=[hub] | {"./fan"} | sim_out=[hub] | $LSP | [affix prefix="sim "] \
  | control_in=[hub] | {gawk -v "target=60"  -f example6.awk } \
  | $LSP | control_out=[hub]

# timer ticks for running the sim
time_base=[timer period=0.5 repeat=0] | sim_in:*

# loopback pipe
control_out:* | cm=[regex pattern="^ctrl " subst="" pass=all] | sim_in:*

# to see that it's working, redirect control's "log" to plumb's output and to 
# a log file:
control_out:* | lm=[regex pattern="^log "] | loglines=[new tee name=${logfile}] | stdout:*


# minimalistic cli: prefix and redirect plumb's stdin to the controller
stdin:* | $LSP | Cm=[affix prefix="CLI "] | control_in:*


The subscript is specified as a variable, in multiple lines (for readability). When [new] is inserted in the logger pipeline, it will create an "instance" of the variable named as the first argument ("tee" in our case) by parsing it and creating each objects ([hub] and [open]) prefixed with its own name (loglines) and a period. In the above example this means loglines.h for the [hub], and a random generated name prefixed with "loglines." for the [open]. The original fd bindings of [new] will be rebound to the newly created processes, in place of the env: references (which are standing for environment of the subscript instance). The parent script normally doesn't want to access internal processes created by [new] so naming them is not necessary except for internal references. However, once [new] finished, the objects created are not special and are in the same global namespace as the parent script, and it is possible to directly bind to loglines.h. However, it is encouraged to use env: and multiple file descriptors as an API instead of direct references to prefixed objects, for readability of the script.

The only mandatory argument for [new] is the first argument, which is the name of the variable that has the subscript to instantiate. The rest of the arguments are optional key=value pairs. When they present, they act like variable assignments visible only to the subscript, and are used for argument passing to the subscript.

Note 1: library stdio also implements variable tee, the same way as the example script did, so it is not necessary to copy this specific subscript in user scripts.

Note 2: for subscripts taking no argument, there's an even shorter way of reference: a variable that has the [new] command. This is how $LSP works: it contains "[new $LSP_]", which is interpreted by the parser if the $LSP or $(LSP) syntax is used. In turn $LSP_ contains the subscript that splits the input into single line records and ensures each record has a single newline at the end.

9. EOF handling

Although this kills the simplicity of the tool and interferes with re-usability by breaking the rule that each output line has the same format, for the sake of the tutorial a banner is printed on top of the log file (and stdout). This is done by piping the output of an [echo] to [hub] of the loglines tee (loglines.h).

Example: example9.pb

include stdio

#configuration:
logfile="example9.log"

#implement a generic tee subscript
tee='
	env:0 | h=[hub] | env:1
	h:* | [open ${name}]
'

# We have a hardware simulator process and a controller:
sim_in=[hub] | {"./fan"} | sim_out=[hub] | $LSP | [affix prefix="sim "] \
  | control_in=[hub] | {gawk -v "target=60"  -f example6.awk } \
  | $LSP | control_out=[hub]

# timer ticks for running the sim
time_base=[timer period=0.5 repeat=0] | sim_in:*

# loopback pipe
control_out:* | cm=[regex pattern="^ctrl " subst="" pass=all] | sim_in:*

# to see that it's working, redirect control's "log" to plumb's output and to 
# a log file:
control_out:* | lm=[regex pattern="^log "] | loglines=[new tee name=${logfile}] | stdout:*


# minimalistic cli: prefix and redirect plumb's stdin to the controller
stdin:* | $LSP | Cm=[affix prefix="CLI "] | control_in:*

# echo a banner in front of the stream
[echo "Fan control - plumb tutorial"] | [hub eof=ignore-on-sink] | loglines.h:*


There is an extra [hub] built in right after the echo. This is required because echo sends out an EOF after printing the message. By default, when this EOF reaches a [hub] (loglines.h) it will shut down the hub. In this script shutting down the hub of tee would back-promote the EOF through the regex match and the control hub to the awk script which would quit. In turn, the EOF of the stdin of the quiting awk would be back-promoted to the sim process that would quit too, and since both sticky processes ended, plumb would exit as well. To avoid this chain of unwanted EOF rooted from [echo], the above mentioned hub is introduced, with the eof=ignore-on-sink argument that makes it ignore the eof read on its input so that it will not appear on its output.

Note 1: similar thing happens if multiple processes of a large plumb script writes stdout or stderr: the first process closing its output will shut down the whole script. Having a separate ignore-on-sink hub on each process would not be practical. Simply ignoring all EOFs on the stdout and stderr [hub]s would be bad too, as in some situations the eof back-promotion through them will be the mechanism for the script to properly shut down and quit. To overcome this problem, stdio provides two sets of the [hub]s, one with lowercase names, the other with uppercase names. The lowercase variants (stdout, stderr) work without side effects, the uppercase variants (STDOUT, STDERR) ignore eof. They can be used in parallel.

Note 2: The only reason the banner will end up as the first line of the log stream is that the control script will write its first line to the same pipe only when the first timer went through the loop. This would be a race condition if a real process was used in place of [echo], because it would depend on that process emitting output faster than the first timer message reaches the awk script. However, [echo] is a virtual process that will emit the string almost immediately after it's been started, which, in worst case is the same iteration as [timer] runs. However, the message sent out by [timer] will go through real processes which will require multiple more plumb iterations (reading from fan and writing to awk, then reading from awk at least). Thus order is guaranteed.