
DP: A library for building portable,
reliable distributed applications

David M. Arnow
Brooklyn College, CUNY

ABSTRACT

DP is a library of process management and 
communication tools for writing portable, reliable 
distributed applications. It provides support for a 
flexible set of message operations as well as process 
creation and management. It has been successfully 
used in developing distributed Monte Carlo, disjunc-
tive programming and integer goal programming 
codes.It differs from PVM and similar libraries in its 
support for lightweight, unreliable messages, as well 
as asynchronous delivery of interrupt-generating 
messages.  In addition, DP supports the development 
of long-running distributed applications tolerant to 
the failure or loss of a subset of its processors. 

1.   Distributed Programming Tools

Although the increase in diversity and avail-
ability of parallel multiprocessors shows no sign of 
abatement, the one truly ubiquitous parallel computer 
system continues to be the LAN of workstations and 
the one massively parallel system to which "every-
one" has access, if not authorization, is the Internet. 
Recognition of network computing as an important 
platform for parallel computing and the desirability 
of high-level and portable programming systems has 
resulted in the widespread development of a host of 
message-passing based programming environments.

Initially, much of this effort went into the 
design of programming languages or language exten-
sions. An extensive review of these is given by Bal 
[Bal89]. Each language, besides providing a higher 
semantic level and portability expresses a view as to 
how a distributed program ought to be conceived. 
These views may be limiting. For example, many 
languages (e.g. SR [Andrews82]) are strongly influ-
enced by the semantic restrictions (synchronous mes-
sage passing) advocated by Hoare [Hoare78]. Other 
languages adopt an asynchronous message semantics 
(for example, NIL [Strom83]). Still others hide mes-
sage passing altogether, and present a paradigm dif-
ferent from that of a distributed system. Most notable 
among these are parallel logic programming lan-
guages such as PARLOG [Clark88], the shared 
memory model of LINDA [Gelernter85], or more 
recently Concert/C [Aurebach92; Goldberg93].

Languages for distributed systems are neces-
sarily designed with a particular paradigm in mind 
and as such must impose some restrictions in order 
to maintain the integrity of that paradigm. The porta-
bility of their implementation is not trivial. Perhaps 
even most significantly, new languages require a 
substantial reinvestment on the part of users. There-
fore, as the locus of interest in parallel network com-
puting changed from language designers to users, 
there has been a shift to the design of libraries of 
standard routines or of environments consisting of 
supporting processes as well as libraries. While the 
programmer is no longer protected by a language, a 
greater flexibility and portability can be achieved.

One such environment is PVM [Sunderam90; 
Geist92], which is implemented on a variety of Unix 
systems and enjoys extensive use by computational 
scientists. Others include NMP [Marsland91] and P4 
[Butler92]. The performance of these systems and 
others has recently been reviewed in two papers 
[Douglas93 and Parsons94]. In response to both the 
proliferation of such environments and the use of 
PVM as a de facto standard, the past two years have 
seen an effort to develop a standard for these envi-
ronments, MPI [MPI93].

All of these environments provide varying 
degrees of flexibility, portability and scalability, with 
PVM providing the most. However, none of them 
offer the flexibility that my applications required. 
Furthermore, none offer the kind of reliability that is 
necessary for conveniently scaling up to long compu-
tations involving many workstations.

2.   Wanted: Portability, Flexibility and
Reliability

Portability. DP was developed as a result of my 
own experiences writing distributed programs that 
ran on LANs and the Internet itself from 1988-1991. 
These programs included Monte Carlo and other sci-
entific calculations as well as operations research 
programs. Writing the process management and 
communication code directly in the native system 
primitives was maddeningly non-portable even 
though all the systems involved were either some fla-
vor of Unix or inspired by Unix. The programs were 
parallelizations of large existing codes, and the nec-
essary interprocess communication was embedded 
deeply so rewriting these programs in a different lan-
guage was out of the question.

Flexibility. Most frustrating was the loss of flexibil-
ity (with respect to use of the native system primi-
tives) that results from the use of any of the other 
distributed languages or programming environments 
available then and now. The communication facili-
ties available in these systems (and described in the 
proposed MPI standard) do not support interrupting 
messages. Thus, a process receiving a message must 
invoke a receive operation explicitly at each point 
that a message is sought. The receive operation is 
typically allowed to be blocking or non-blocking, so 
both barriers and polling are readily available to the 
programmer.

In situations where a process cannot proceed at 
all until data from an incoming message has been 
received, these message semantics pose no prob-
lem- it is entirely natural to explicitly encode mes-
sage receive operations just prior to the use of the 
needed data in the program and entirely proper for 
these operations to be blocking.

However, there are situations, for example in 
the Monte Carlo and in disjunctive programming 
applications in which the I was interested, where:

(a) incoming data serves "merely" to increase 
the efficiency of the processes computation

and

(b) it is not certain that the incoming data will 
arrive at all!

In these situations, it is both unnatural and 
extremely inefficient to explicitly encode non-block-
ing receive operations in the process's application 
code. In some cases, the problem is mitigated by the 
availability of threads. A dedicated input thread can 
integrate the contents of an incoming message into 
the process's data objects without the need for poll-
ing. However, it may be that the only way to effi-
ciently respond to the new information is for the 
main thread to make an abrupt change in its control, 
i.e. to make an sudden jump out of its current nested 
routine stack. In the absence of an inter-thread sig-
naling facility, there is no way for the main thread to 
recognize the need for this short of testing an object 
in its own address space- cheaper than executing 
non-blocking receives, but still inefficient.

Another loss of flexibility is the inability to 
send fast UDP-style messages in situations where 
message unreliability may not be a serious drawback. 
Operating system services often have this character-
istic, but, surprisingly perhaps, so do some applica-
tions. Consider, for example, a large Monte Carlo 
calculation involving thousands of random walks. It 
is often the case that if a small fraction of these are, 
at random, lost (as a result of message loss), then the 
impact is "only" an increase in variance. If as a 
result of permitting such losses, the computation can 
run faster and hence have a greater sample size, then 
the benefit could outweigh the loss.

Reliability- for the sake of scalability. Although 
reliability was decidedly not an initial concern of this 
project, the project's own success forced the issue. 
The applications that most readily make use of DP 
are those which have a high computational cost and 
which are parallelizable. But by running a parallel-
ized application over a great number of workstations 
on a LAN for a long time, the likelihood of zero 
workstation reboots during the course of a single 
computation began to become uncomfortably low. In 
order to more fully realize the potential for the 
exploitation of networks and internetworks of work-
stations, greater reliability is essential. So, as the 
project developed, increased reliability (with respect 
to single workstation failure) became a goal.

 In summary, the DP library was designed with 
the following goals:

(1)	Flexibility and power: The primitives must 
provide the power to perform most distrib-
uted programming functions. The 
application programmer should not lose any 
functionality or efficiency by using DP 
instead of the native system primitives.

(2)	Portability: The primitives should be imple-
mentable on most, if not all, distributed 
computing platforms.

(3)	Reliability: the loss due to external 
circumstances of one or all processes on a 
single workstation should have no impact 
on the outcome of a distributed computation 
other than a short delay, provided that the 
processes involved do not conduct any i/o 
other than message sending and receiving.

With the exception of not providing a broad-
cast or multicast message facility, DP meets these 
goals. On the other hand, in comparison to other dis-
tributed programming environments and languages, 
DP provides a very low-level application interface. 
There is no typing of messages, minimal support of 
data format conversions, no queuing of synchronous 
messages, and no concept of conditional receives. 
There is, however, a higher-level distributed pro-
gramming support environments, stdDP [Arnow94] 
that provides those services and is implemented 
using DP.

2.1  A sketch of a motivating application: 
capacitiated warehouse location

To clarify the kind of problem that demands 
the interrupting message facility that is absent from 
other environments, we present one example: the 
capacitated warehouse location problem- a classic 
operations research problem.

The goal is to supply the needs of customers 
from a group of proposed warehouses and to mini-
mize monthly cost. Using a warehouse requires a 
fixed monthly charge and there is the cost of supply-
ing all or part of a customer's requirements from a 
given warehouse. The problem is to determine which 
warehouses to open so as to minimize cost while 
meeting demand. Although the problem is NP-hard, 
good results can be achieved using Branch-and-Cut 
and Branch-and-Bound techniques. Worker pro-
cesses are given portions of the search tree to explore 
and communicate intermediate results to another. 
Idle worker processes are given new tasks by master 
processes which, must obtain these from busy 
worker processes. Worker processes can complete 
their tasks significantly more rapidly through pruning 
by "knowing" the current global minimal cost.

Both of these operations- obtaining the new 
global minimum and paring the current subtree to 
define and transmit a new task- are in response to 
arriving messages, the number of which is unknown. 
Polling, besides being inefficient requires an inordi-
nate modification of the original code. In both of 
these cases, efficiency and convenience is served by 
providing interrupting messages. This application 
and other related ones are described in [Arnow91, 
Arnow94, Arnow95].

3.   DP's services

Although of the same genus as environments 
such as PVM and P4, DP differs from each of them 
in a number of important ways, primarily because of 
the above goals. Functionally, the most important 
difference is its provision for unsolicited messages 
whose arrival generate a software interrupt. This pro-
vides a flexible method of sending large quantities of 
urgent information that cannot be easily accom-
plished with, say, the unix-signal transmission of 
PVM. It also allows a programming environment to 
provide a shared-memory like capability. DP permits 
messages to be sent in the cheapest way possible, 
when reliable transmission is not necessary. Mes-
sages can be received with or without blocking. Pro-
cess creation is dynamic and limited only by 
available computing resources. Furthermore, DP can 
guarantee the reliability of those processes that 
engage in no i/o other than message sending and 
receiving in the event of a single workstation failure. 
This section presents an overview of most of the ser-
vices provided by DP.

3.1  Process management

Execution. Execution of a DP application starts by 
invoking a suitably compiled and linked DP execut-
able program. In DP parlance, this process is called 
the primary, though its primacy is for the most part 
just a matter of being first- there is nothing very 
special about the primary. The primary process and 
its descendants (those process that are spawned by it 
or its descendants) constitute a DP process group. 
DP processes can only communicate within this 
group.

Identification. Each DP process is identified by a 
value of type DPID, guaranteed to be unique among 
all possible DP processes. The function dpget-
pid() returns the current process's id via a store-
back parameter. Processes learn the DPIDs of other 
processes either by being their parent, by receiving a 
message from them, or when the DPID of a process 
is in the contents of a message and is used by the 
receiving process as such.

The hosts file. In order to spawn processes, a DP 
program must have information about the available 
hosts for process creation. DP processes can acquire 
that information dynamically but it is usually conve-
nient to provide that information to the primary via a 
hosts file in the directory from which the DP applica-
tion is executed. The primary will automatically read 
this file and prompt the user for the passwords 
needed to access the networks named. This informa-
tion is inherited by spawned processes.

The hosts table. Host information is maintained 
internally in a hosts table. In DP application code, 
hosts are identified using integer indices to this table. 
The table is dynamic- new hosts can be introduced 
during run-time by the function dpaddhost(). This 
call is an alternative and a supplement to providing 
host information through a static, though convenient, 
hosts file.

Process creation. Process are spawned by calling 
dpspawn(), passing

·	the name of the program to be executed,

·	the integer index of the host on which to run 
the new process,

·	a semantic packet to be sent to the new pro-
cess; this packet is a program-determined col-
lection of bytes that can be used as an initial 
parent-to-child communication- typically it 
contains the DPID of the parent, along with 
possibly other application parameters.

The call to dpspawn() returns the DPID of 
the new process via a storeback parameter. The entry 
point for the newly spawned secondary processes is 
main(), not the instruction after the call to 
dpspawn(). These secondary processes do not have 
access to the original command-line arguments nor 
do they inherit a copy of the creator's address 
space- their data must come from the semantic 
packet or from subsequent received messages. Guid-
ing dpspawn is the entry in the internal host table 
for the given host id. That entry, among other things, 
determines the user name under which the new pro-
cess will run and most significantly the directory in 
which the program must be found and in which the 
new process will start executing

Initialization. The first DP call any DP program 
makes should be dpinit(), which sets up the nec-
essary process and communication environment. 
This includes initialization of DP's data structures 
and establishing an address. If the process calling 
dpinit() was created by dpspawn() the caller is 
given access to the semantic packet, described above. 
A pointer to this packet is returned via storeback 
parameter to dpinit(). The size of the packet is 
also stored back. In the case of a primary process, 
there is no semantic packet and the size stored back 
is -1: that is how the code can determine whether it 
is running in the primary process or a secondary after 
a call to dpinit().

The call to dpinit() completes the hand-
shaking with the creating parent process. The creat-
ing process cannot continue its work until the created 
process makes this call. For this reason, the call to 
dpinit() should be made as soon as possible. 
Upon returning from dpinit(), the process is a 
genuine DP process and can partake in the activities 
of the DP family.

In all cases, dpinit() returns the number of 
host machines in its inherited internal host table and 
stores back the host id of the machine on which the 
process is running.

Joining a DP process group. Any non-DP process 
may join an existing DP process group. For this to be 
possible, one or more of the processes in the group 
must invoke dpinvite(). This call creates a con-
tact file, which contains all the information that a 
new process would normally get from dpspawn(). 
All the joining process need do is invoke dpjoin() 
with the pathname of the contact file as an argument. 
This call plays the role of dpinit() and establishes 
communication using information provided in the 
contact file. The newly joined DP process's identity 
can then be conveyed to any process in the group. 
Note that this mechanism requires that the joining 
process and the inviting process must share some file 
address space in common.

Finishing Up. All DP processes must call 
dpexit() to make a graceful exit. The dpexit() 
function is the DP substitute for Unix exit() call; 
that is, it makes a no-return exit. If a DP process fails 
to exit using dpexit(), i.e. if it exits using the 
Unix exit(), other DP processes in the application 
may fail. The main purpose of dpexit() is to with-
draw the exiting process from contact with the 
remaining DP processes prior to an actual exit in a 
way that guarantees correct message transmission. 
The only argument to the function is a string identi-
fying the reason for termination. The string appears 
only in the log file for the process and may be null.

Sometimes, it may be desirable for a process to 
cease DP activity but persist in some other activity. 
By passing the address of a function to dpsetex-
fun() any time prior to calling dpexit(), a pro-
cess guarantees that dpexit(), after withdrawing 
from the group of DP processes, will call the indi-
cated function prior to doing the actual Unix exit.

Bailing out. The dpexit() call terminates one DP 
process in the group. Generally, each process's own 
logic dictates when that termination is appropriate. In 
exceptional circumstances, it may be necessary to 
allow a single process in the group to force termina-
tion in the entire group. In such a case, dpstop() 
can be called. The dpstop() call force immediate 
shutdown of all processes. The function set by 
dpsetexfun() is not called and the ensuing shut-
down is so radical that even earlier messages that 
had been sent but were not yet delivered may be 
thrown away. The function receives one argument, a 
string, which has the same meaning as the string 
passed to dpexit().

3.2  Communication

Sending messages. DP processes communicate by 
sending and receiving messages. For sending mes-
sages, the dpwrite() routine requires the DPID of 
the recipient and a pointer to the start of the message 
body along with the message body size. A variant, 
dpsend(), allows a message body to be specified 
as a linked list. Messages can be reliable or non-reli-
able and interrupting or non-interrupting. Reliability 
here means that DP, which as I describe in section 4 
uses UDP as its underlying protocol, will carry out 
an ack/timeout/retransmit protocol that will guaran-
tee the eventual availability of the message to the tar-
get provided that the underlying network and 
relevant host machines do not fail. Reliable messages 
are received in the order in which they were sent. 
Sending the message unreliably means that DP will 
send the message to the target only once and assume 
no further responsibility- a much cheaper method 
of message transmission.

Regardless of whether the message is sent reli-
ably, return to the sender is immediate; the sending 
process will not be blocked during this time. So upon 
return from dpwrite(), one thing is certain: the 
target has not yet received the message.

Receiving messages. Logically, each DP process has 
two receiving ports: one for receiving interrupting 
messages and another for receiving non-interrupting 
messages. Non-interrupting messages are queued 
upon arrival and do not affect the receiving process 
until it explicitly reads the message with the 
dprecv() call. In the case of the interrupting mes-
sage, the message's arrival may force the invocation 
of a special message-catching routine if such a rou-
tine has been designated by the receiving process via 
a call to dpcatchmsg(). Whether or not such a 
routine has been designated, the interrupting message 
must be read explicitly with the dpgetmsg() call, 
not the dprecv() call. Both routines return the 
DPID of the sender as well as the message itself and 
both routines move the incoming message from an 
internal DP buffer to an application-provided buffer. 
If the latter is insufficient to hold the message, the 
message is truncated. The dprecv() call can be 
made with or without blocking semantics, but the 
dpgetmsg() call, because it is typically used inside 
an interrupt handler where blocking would be inap-
propriate never blocks. In the event that several 
interrupting messages arrive before the system has 
had a chance to invoke the message handler function, 
only one call to the message handler will be made, 
i.e., there is not a one-to-one correspondence 
between interrupting messages and calls to the han-
dler. Hence, the message handler must be assume 
that there may be more than one interrupting mes-
sage ready to be received.

Longjumps. Sometimes when the message-catching 
routine is invoked, it responds to the incoming infor-
mation by modifying a global data structure or send-
ing out a message with requested information. At 
other times, however, it must respond by making an 
exceptional change in the control flow of the receiv-
ing process. The dplongjmp() routine provides 
that capability. It works exactly as longjmp() does 
and in fact its argument is a jmpbuf that was set by 
setjmp() (there is no "dpsetjmp"). The only reason 
for dplongjmp() (instead of the standard 
longjmp()) is that the jump out of the message 
handler must be accompanied by a re-enabling of 
interrupting messages.

3.3  Synchronization and timeouts

Critical sections. The application-specified mes-
sage-catching routine may be invoked at any time 
and may reference global objects. Thus, any other 
code that accesses these global objects is a one-way 
critical section, in the sense that though, upon receipt 
of an interrupting message control may transfer from 
the critical section to the handler, the reverse is not 
possible: control will not pass from the handler until 
it has completed its work and returns. To guarantee 
mutual exclusion, such access should be preceded by 
a call to dpblock() to disable calls to the interrupt 
handler and followed by a call to dpunblock() to 
re-enable them. Upon invoking dpunblock(), if 
any interrupting messages arrived since the call to 
dpblock(), the catching function will be invoked.

Synchronization and Timeouts. Sometimes a pro-
cess needs to wait until some condition becomes 
true, typically as a result of incoming interrupting 
messages. The dppause() call suspends execution 
of the process until any asynchronous event takes 
place. The application may set a timer and a timeout 
function through non-zero arguments to this call. 
Upon entering dppause(), interrupting messages 
(and calls to the message catcher) are enabled and 
status is restored upon return. Typical use of this 
function is

dpblock();

while (!some_desired_condition)

		dppause(0, (FUNCPTR) 0)

dpunblock();

The intent of this code is not to proceed until 
some_desired_condition, which presumably 
depends on the arrival of a message, is true. Rather 
than busy-wait, the program calls dppause() which 
will not return until some event, possibly a message 
arrival, has taken place. Because many events are 
possible, the desired condition has to be rechecked 
and dppause() reentered if necessary. The window 
between the checking of the desired condition and 
entry into dppause() open the possibility for a race 
condition and so the loop is enclosed by calls to 
dpblock() and dpunblock().

3.4  Restrictions and Application Front 
Ends

Except for processes that use dpjoin() to 
join a DP process group, standard input/output/error 
are not available to the DP application. Thus 
dpjoin() is essential if interactive programs are 
desired. Message-catching functions may not call 
dprecv() in blocking mode.

Timing. All systems calls and standard subroutines 
that are implemented using the Unix alarm system 
call (or its variants) are not allowed because they 
would interfere with DP's own reliance on this facil-
ity. That includes: sleep, alarm, ualarm. To 
restore some of this functionality to the application 
writer, there is a special DP routine, dpalarm(t,f) 
which arranges for function f to be invoked after t 
milliseconds.

Asynchronous and signal-driven i/o. Using the 
BSD select() system call or making use of the 
SIGIO signal is forbidden.

Exec and fork. Use of any of the exec variants is 
forbidden, unless used in conjunction with fork() 
or after dpexit() has been called. The fork() 
system call can be used provided that the children do 
not attempt to partake in the execution of DP rou-
tines. Child processes (but not the parent) may do 
execs.

Application front ends. These restrictions might ini-
tially seem daunting to the application writer. How-
ever, it is always possible for non-DP processes, 
such as one intended to support an event-driven user 
interface front end, to fork a child process which 
uses dpjoin() to become a DP process or even 
which uses dpinit() to become a DP primary pro-
cess. The non-DP parent and the DP child can com-
municate using pipes or SysV IPC.

4.   Examples 

A simple example: primes. The primes program, 
shown below, illustrates the use of the DP interface. 
The primary process uses dpcatchmsg() to 
arrange for fcatch() to be invoked in the event of 
an interrupting message and then spawns two pro-
cesses for every available host, sending a semantic 
packet containing just the DPID of the primary to 
each secondary process. It divides the interval 
1..100000 equally among all the processes, including 
itself and then uses dpwrite() to send the lower 
bound of each subinterval to each process in a reli-
able, non-interrupting message (DPREL|DPRECV). 
The primary then searches for primes in its own sub-
interval.

Meanwhile, the secondary processes have 
started and, having received their lower bound by 
calling dprecv(), they too start searching for 
primes in their own subintervals. Both secondary and 
primary processes invoke newprime() when a 
prime number is found. For the primary, newprime 
just adds the prime to the set of primes- this is a 
critical section because an interruping message may 
access the same set and so must be protected with 
dpblock() and  dpunblock(). For the secondar-
ies, dpwrite() is used to send the prime number in 
a reliable, interrupting message (DPREL|DPGET-
MSG). The arrival of these message cause fcatch() 
to be invoked, and the incoming prime number to be 
stored in the set of primes.

To let the primary know that no more primes 
are forthcoming, secondaries send a negative integer 
in a reliable interrupting message and then exit. The 
primary waits till it has received the appropriate 
number of such messages and then exits.





#include	<stdio.h>
#include	<dp/dp.h>
struct semstr {		/* most programs would have  	*/
	DPID	s_id;	/*  other fields here as well     */
} s, *sp;

#define	MAXPRIMES	100000

int	p[MAXPRIMES], np=0, IsPrimary, nprocs, nhosts, done=0,
	interval, myhostid;

#define	RelInt		(DPREL|DPGETMSG)
#define	RelNonInt	(DPREL|DPRECV)

void
sendint(DPID *dest, int i, int mode) {
	dpwrite(dest, &i, sizeof(i), mode);
}

void
newprime(int n) {
	if (IsPrimary) {
		dpblock();	/* potential race condition */
		p[np++] = n;	/*    so block interrupts   */
		dpunblock();
	} else
		sendint(sp->s_id, n, RelInt);
}

void
fcatch() {
	int	v;
	DPID	src;
	while (dpgetmsg(&src,&v,sizeof(p) !=DPNOMESSAGE)
		if (v<0)
			done++;
		else
			p[np++] = v;
}

void
search(int n1,int n2)
	int	i;

	for (i=n1; i<=n2; i++)
		if (IsPrime(i))
			newprime(i);
}

void
primary(char *prog) {
	int	i=1, v=0;
	DPID	child;
	FILE	*fp;

	dpcatchmsg(fcatch);
	dpgetpid(&s.s_id);
	while (i<nprocs) {
		dpspawn(prog, &child, i%nhosts, &s, sizeof(s));
		sendint(&child, v, RelNonInt);
		v+=interval
		i++;
	}

	search(v,MAXPRIMES);
	done++;
	dpblock();	 /* potential race condition: so  */
	while (done<nprocs) /*  block interrupts        */
		dppause(0L, NULLFUNC);
	dpunblock();

	output("results", p, np	);	/* write primes to results */
	dpexit("You're fired!");
}

void
secondary() {
	int	v;
	DPID	src;

	dprecv(&src, &v, sizeof(v), DPBLOCK);
	search(v,v+interval);
	sendint(sp->s_id, -, RelInt);
	dpexit("I quit!");
}

main(int ac, char *av[]) { /* main: executed by all processes	*/
	nhosts = dpinit(av[0], &sp, &size, &myhostid);
	IsPrimary = size==(-1);
	nprocs = 2*nhosts;	/* 2 processes per host		*/
	interval = MAXPRIMES/(nprocs+1);
	IsPrimary? primary() : secondary();
}

Capacitated warehouse location problem, again. 
A branch-and-bound search for solutions can be effi-
cently parallelized using DP. N processes are cre-
ated, N being determined by available hardware. The 
primary maintains a set of unsearched subtrees- ini-
tially this set is take from the top N subtrees of the 
search tree. When a secondary process becomes idle, 
it sends an reliable interrupting message to the pri-
mary requesting a subtree and then waits until it 
recieves one. When the primary's set of unsearched 
subtrees falls below a low-water mark, it sends reli-
able interrupting messages to all the active secondar-
ies, requesting that they split their subtree at the next 
convenient point. These will continue to split their 
subtrees, sending (in reliable interrupting messages) 
the split-off branches to the primary to replenish its 
set, until the primary, having passed a high-water 
mark, sends them reliable interrupting messages to 
desist. This is very effective load-balancing. The 
availability of interrupting messages here is essential 
because of the unpredicatibility of need and avail-
ability of search subtrees on the one hand and the 
undesirability of frequent polling on the other.

An important element of the branch-and-bound 
search algorithm is the ability to prune search sub-
trees when the best extremum the subtree can offer is 
inferior to the best extremum already encountered. In 
a shared memory environment, all processes have 
memory access to the best extremum but in a mes-
sage-passing network environment making sure this 
information is rapidly available to all processes is 
both necessary and non-trivial. Using DP, this prob-
lem is efficiently addressed as follows. Whenever a 
secondary discovers what appears to be a new best 
extremum it sends a reliable interrupting message to 
the primary, which multicasts this in lightweight 
(unreliable) messages to all the other secondaries. 
Making these message interrupting guarantees that 
the information will become available to the receiv-
ing process as quickly as the underlying system per-
mits. Using lightweight messages ensures that the 
multicasts will not overload the system nor over-
buden the primary. The cost of occasionally losing 
such a message is minor: it simply means that occa-
sionally for some, usually small, duration, a second-
ary may not be pruning its subtrees as effectively as 
it would otherwise.

5.   Implementation 

DP is implemented using the socket system 
call interface to the TCP and UDP services of the 
TCP/IP protocol suite, basic Internet services such as 
ping and rexec and, of course, a host of Unix ser-
vices. Once the basic implementation issues were 
decided all of these services were used in the obvi-
ous way.

Communication mechanism. The first issue to be 
decided was how inter-process communication is to 
be handled. PVM and many other environments use 
TCP. This is a very attractive choice given that much 
inter-process communication has to be reliable and 
that TCP handles this within the OS kernel. Building 
a reliable service using UDP requires duplicating 
much of this outside the kernel with all the context-
switching cost that this implies. Nevertheless, DP's 
inter-process communication is almost entirely 
implemented using UDP. The reasons for this are:

·	TCP requires maintenance of connections and 
Unix (and presumably most systems) place 
limits on the number of connections that a 
process can maintain. The choice then is to 
take down and recreate connections as needed 
(too expensive), limit the number of processes 
with which a process can communicate 
(clearly unacceptable), or implement a routing 
mechanism. This would have to be done out-
side the kernel, negating in part the purpose of 
using TCP in the first place, especially when 
large numbers of processes are involved.

·	Efficient as TCP is, sending a UDP packet 
into the ether is cheaper, and because the 
design of DP was predicated on the desirabil-
ity of low-cost unreliable messages in many 
cases, it seemed a shame to pay more than 
necessary for that kind of communication.

·	The fault-tolerant mechanism, described 
below, is greatly simplified using UDP rather 
than TCP. This was not a reason at the outset 
of this project for using UDP, but it became a 
reason for being very glad about the choice!

Using UDP does require DP to guarantee reli-
able sequenced delivery of those messages that 
require this service. In the current implementation, 
reliable messages are implemented in the most naive 
way (with sequence numbers, positive acks, time-
outs, retransmits and a notion of "stale" messages). 
More sophisticated implementations are certainly 
possible and in the still "gray" part of the DP inter-
face, there are calls that allow the programmer to 
adjust protocol parameters such as timeout size.

Process identification. It would have been desirable 
for DPIDs to be integers or some other basic C type. 
However, that requires some kind of id-to-address 
mapping internal to DP. A problem arises when an 
application DP process references a DPID for which 
its own DP runtime support does not have a map-
ping. This could and does arise when DPIDs are sent 
in application messages. To eliminate any need for a 
centralized or distributed id resolution mechanism, 
DPIDs are not integers but 28-byte structures con-
taining all the information needed to address the cor-
responding process and more. The additional 
information represents "the kitchen sink". Some of it, 
in retrospect, has turned out to be useful- other 
components (indicating what protocol- for example 
IPX- is involved) may never find a use.

There are parallel methods which naturally 
assign integers to a set of processes and use these 
assignments in their algorithms. As it turns out, the 
use of a non-basic type does not pose much of a 
problem in those cases. Programs which use such 
methods do not spawn processes dynamically but 
rather use a fixed number of processes created from 
the outset. The need to create an initial DPID-to-inte-
ger map is only a minor inconvenience to the appli-
cation writer, and there are libraries, such as stdDP 
[Arnow, 94] built on top of DP that provide this ser-
vice, along with others. From an esthetic point of 
view the chief regret with this choice is the necessity 
for providing a dpidmatch() function. On the 
other hand, from an implementation point of view, 
things are greatly simplified.

Process creation and initialization. Processes are 
created using the rexec service. In order to spawn a 
process, the creator, after checking the argument to 
dpspawn() as best as possible, forks a child pro-
cess which does most of the work. The child process 
calls rexec and uses the resulting TCP connection to 
deliver the semantic packet and the internal hosts 
table to the newly spawned process. It uses the same 
connection to receive the new process's DPID 
(which contains, among other data, its UDP address). 
This is the only use of TCP in the implementation. 
The parent receives the DPID from the child through 
a pipe and waits for the child process to complete the 
handshaking with the new process and disappear, 
along with its TCP connection. This arrangement 
avoids the need for a separate call by the parent to 
recognize the completion of the process creation. 
Although it compels the parent to wait until the new 
process is created, the creator is still able to receive 
and respond to interrupting messages- an allowance 
which is made much easier by having a child process 
do most of the work.

Reliability. The scheme for enhancing reliability is 
inspired by one used, in a different context, in the 
early 1980s in the design of a fault-tolerant Unix box 
based on a shared memory architecture [Borg83]. 
Each active process is created with a backup process 
residing on a different workstation. The workstation 
housing a backup must be binary compatible with 
that executing the active process. Furthermore, 
because of the way recovery is implemented, each 
pair of active and backup processes must share some 
file address space in common (though between two 
distinct pairs there is no such need). The scheme only 
guarantees against single workstation failure, though 
it may work in the event of multiple failures.  What 
it requires is that the workstation holding the backup 
process not fail. So in the figure below, had th  
backup of process C been executing on an additional 
machine, say Sparc#4, rather than on Sparc#2,  then 
both Sparc#1 and Sparc#2 could have failed simulta-
neously.

When process A sends a message to process B, 
process B uses the message and sends a copy of the 
message to its backup, B'. If the message is a reliable 
message, B' sends the acknowledgment to A. Thus A 
continues to retransmit in the usual way until it is 
certain that both B and B' have the message. Redun-
dant transmits to B cause no problem because they 
are simply stale messages which an ack/timeout/
retransmit protocol would ignore anyway. B' saves 
all messages (until, as described below, a checkpoint 
operation takes place). Upon detection of failure (see 
below), B' starts executing. Since it has all the mes-
sages (with the possible exception of a few unreli-
ables) that B received, its execution will be identical. 
It will send output messages that are redundant to 
ones sent by B previously, but these will be treated as 
stale by their recipients and not cause any inconsis-
tency.

To preserve the total order of messages as B 
received them, B assigns each newly received mes-
sage an internal sequence number. This number is 
passed on with every copy of that message that B 
sends to B'.

Failure is detected in an ad hoc fashion. If pro-
cess B periodically sends  dummy messages to its 
backup, B'. If B' does not hear from B after a time, 
it pings B's machine. If there is no respond after a 
number of tries, B' takes over from B.

Recovery begins with B' sending a recovery 
message to B's parent and every process that B has 
interacted with. Processes receiving such messages 
revise their internal DP process tables appropriately 
and propagate these messages to all other processes 
they in turn have communicated with. The applica-
tion code layer never learns about this and will con-
tinue to use the original DPID of B which will now 
be mapped to that of B' by the DP implementation. 
Delay in this propagation of recovery messages 
poses no problem because messages sent to B will 
remain unacknowledged and hence be retransmitted, 
eventually, to B'. Furthermore, even if B has only 
failed temporarily (the transceiver cable fell out, say) 
and comes back to life in the middle of the recovery 
there still will not be a problem since it always for-
wards any messages that it receives to B'. Any mes-
sages that a temporarily reincarnated B would send 
will either be stale or cause the equivalent B' mes-
sage to be treated as such.

Following the transmission of recovery mes-
sages by B', is the roll-forward phase. To avoid the 
delay that would result from having to roll-forward 
from scratch, active processes periodically (and 
transparent to the application) have checkpoints, 
where DP writes out the process's entire stack and 
data segment to disk. (This is why the active/backup 
pair must have some file space in common.) At the 
outset of the roll-forward phase of recovery, these 
segments are copied into the address space of pro-
cess B', so the roll-forward starts from the last 
checkpoint. These checkpoints are done quite infre-
quently, on the order of 10-15 minutes. The rationale 
is that anyone running a computation on a group of 
workstations in which one fails should be grateful to 
have only a recover delay of 15 minutes. The reason 
for the fault-tolerance is for computations that run 
hours, not minutes.

To reduce excess message traffic resulting 
from redundant messages sent by B' in its roll-for-
ward phase, the active process keeps B' informed as 
to the number of messages it has sent out to each pro-
cess. During roll-forward, these counts are decre-
mented and no messages are actually sent out to a 
given process until its corresponding count has 
reached 0.

This scheme necessarily requires a number of 
restrictions on the activities of the active processes. 
One severe restriction is that they cannot be doing I/
O other than DP message transmission (or if they do 
I/O, its integrity can't be guaranteed).

6.   Performance 

A number of comparisons of DP's performance 
with that of PVM have been made. One test involves 
a ring of processes passing a single message from 
one to the other. Another involves a set of processes, 
each of which is sending and receiving messages 
symmetrically to all the others.

In each test, a DP and an equivalent PVM pro-
gram were run simultaneously, in the same environ-
ment (same machines, same network, same 
directories, etc.) Each program was given a timeout 
value and the number of messages passed at that 
point was measured. Table 1 shows the ratio of mes-
sages sent in the DP program to that of the PVM.

   TABLE 1. Comparing DP and PVM
   Processes	DP/PVM: ring	DP/PVM: set
   8		1.33		0.82
   16		1.34		0.88
   64		1.23		0.81
   100		1.36		0.90
   112		1.37		1.78

These results suggest DP performs comparably 
to PVM, that DP may scale better (probably because 
it does not rely on TCP connections) and that a more 
through performance study is desirable.

7.   Portability 

DP is implemented on SunOS, Solaris, AIX 
and on DEC RISCstations. Earlier versions were 
implemented on the Alliant FX-8 and the KSR-1. A 
Windows-NT implementation is underway as is a 
port to NetBSD.

Apart from its reliability enhancement, DP is 
very undemanding of the underlying system. It 
requires the ability to spawn remote processes, send 
messages without blocking and have interrupt-driven 
input. The reliability enhancement described above 
has been implemented on SunOS only. Presumably it 
could be carried out in most Unix environments.

8.   Retrospect and Prospects

There seems to be a gap between system 
designers and application programmers in the area of 
parallel distributed programming. In this project, I 
started out wearing an application programmer hat. I 
had a set of requirements. There was no library then 
that came close to meeting them and even now no 
other library meets all of them. At the outset, I had 
no plan, for example, to provided dynamic process 
creation. As soon as I wore the system designer hat 
for a while, that seemed to be a great weakness in 
the design. It seemed that the flexibility to start with 
only a few processes and then as new tasks are iden-
tified, create additional ones is crucial. Ideally, the 
programmer would design the process structure of 
the application to mirror the logical task structure of 
the problem. After taking the trouble to provide this 
facility, I was quite chagrined to find that most of the 
DP users simply assess the number of machines that 
they have available, choose a number of processes 
about twice that number, and let them run, using a 
"worker parallelism" paradigm, in which worker pro-
cesses are given or pick up tasks as they become 
idle. The reason for this is understandable. Available 
hardware is the determining factor in the plans of 
these practitioners. As I turned to using DP myself, I 
found I was doing the same thing. Is dynamic pro-
cess creation really worth the trouble?

On the other hand, the interrupting message 
facility and the unreliable messages are used exten-
sively. The former in particular has been seen to sim-
plify the parallelization of existing code, by 
eliminating the need for finding the places in the 
code to put receives. The interrupting message han-
dler takes care of that. Message sends still need to be 
inserted into the existing code, but somehow it is 
easier to identify the points where there is a result to 
brag about (to other processes say) than to identify 
receives. In cases where the reverse is true, then 
sending requests for data can be placed at the appro-
priate points and the sending of results can be inter-
rupt driven.

On the other hand, the interrupting message 
facility and the unreliable messages are used exten-
sively. The former in particular has been seen to sim-
plify the parallelization of existing code, by 
eliminating the need for finding the places in the 
code to put receives. The interrupting message han-
dler takes care of that. Message sends still need to be 
inserted into the existing code, but somehow it is 
easier to identify the points where there is a result to 
brag about (to other processes say) than to identify 
receives. In cases where the reverse is true, then 
requests for data can be placed at the appropriate 
points (send request in an interrupting message, wait 
for response) and the actual sending of results can be 
driven by the arrival of these interrupting messages.

The implementation of single processor fault-
tolerance invites an effort to undertake process 
migration and load balancing. Whether the admit-
tedly heavy-handed fault-tolerant scheme used here 
is efficient enough for that remains to be seen.

9.   Availability

DP runs on Sun SPARCstations, DEC RISCs-
tations and on IBM RS/6000s with C and Fortran 
interfaces. It is, along with documentation and some 
utilities, available from the author.

10.   Acknowledgments

While an undergraduate at Harvard, Haibin Jiu 
spent two of his summers assisting in this effort. Jim 
Basney, a student at Oberlin spent a "winter term" 
on this as well. I especially would like to acknowl-
edge the work of Jerry Chen, who while working on 
his doctorate at CUNY implemented an early version 
of DP on the KSR-1 and with whom I have had 
many valuable conversations.

11.   References

Andrews, G.R.: The distributed programming language 
SR-- mechanisms, design and implementation. Soft-
ware- Practice and Experience 12,8 (Aug. 1982).

Arnow, D.M.: Correlated Random Walks in Distributed 
Monte Carlo Programs. ICIAM 91, Washington D.C. 
(July 1991).

Arnow, D.M.: StdDP- a layered approach to distrib-
uted programming libraries. T.R. 94-11 Dept. of CIS, 
Brooklyn College (1994).

Arnow, D.M., McAloon, K.M. and Tretkoff, C.: Distrib-
uted programming and disjunctive programming. 
Proceedings of the Sixth IASTED-ISMM Int. Conf. on 
Parallel And Distributed Computing And Systems 
Washington D.C. (October 1994).

Arnow, D.M, McAloon, K.M., and Tretkoff, C.: Parallel 
integer goal programing. To appear in the 23rd ACM 
Computer Science Conference, Nashville. (1994).

Aurebach, J., Kennedy, M., Russell, J., and Yemeni, S.: 
Interprocess communication in Concert/C. T.R. RC 
17341, IBM Watson Research Center, Yorktown 
Heights, (1992).

Bal, H. E., Steiner, J. G., and Tanenbaum, A.S: Pro-
gramming languages for distributed computing 
systems. Computing Surveys 21,3 (Sept. 1989).

Borg, A., Baumbach, J., and Glazer S.: A message sys-
tem supporting fault tolerance. 9th ACM Symp. on 
Operating Systems Principles. Bretton Woods, New 
Hampshire, (Oct. 1983).

Butler, R., and Lusk, E.: User's guide to the P4 pro-
gramming system. Tech. Rep. ANL-92/17, Argonne 
Nat. Lab. (1992).

Chen, J.: Distributed Green's function Monte Carlo cal-
culations. Ph.D Thesis, Dept. of CS, CUNY (1994).

Clark, K.L.: PARLOG and its applications. IEEE Trans-
actions on Software Engineering SE-14, 12 (Dec. 
1988).

Douglas, Craig C., Mattson, Timothy G., and Schultz, 
Martin H.: Parallel programming systems for work-
station clusters. Yale University Dept of CS Technical 
Report, (Aug., 1993).

Geist, G.A. and Sunderam, V.S.: PVM- Network-
based concurrent computing on the PVM system. 
Concurrency: Practice and Experience 4(4) (Jun., 
1992).

Gelernter, D.: Generative communication in Linda. 
ACM Transactions on Programming Languages and 
Systems 7, 1 (Jan. 1985).

Goldberg, Arthur P.: Concert/C Tutorial: An Introduc-
tion to a Language for Distributed C Programming, 
IBM Watson Research Center, Yorktown Heights, 
(Mar., 1993).

Hoare, C.A.R.: Communicating sequential processes. 
Communications of the ACM 21,8 (Aug. 1978).

Marsland, T.A., Breitkreutz, T., and Sutphen, S.: A net-
work multi-processor for experiments in parallelism. 
Concurrency: Practice and Experience, 3(1), (1991).

MPI Forum: Message Passing Interface Standard 
(Draft). Oak Ridge National Laboratory. (Nov. 
1993).

Parsons, I.: Evaluation of distributed communication 
systems. Proceedings of CASCON '93, Vol 2. Tor-
onto, Ontario, Canada (Oct. 1993)

Strom, R.E. and Yemeni, S.: NIL: An integrated lan-
guage and system for distributed programming. 
SIGPLAN Notes 21, 10 (Oct. 1983).

Sunderam, V.S.: PVM- A framework for parallel dis-
tributed computing. Concurrency: Practice and 
Experience 2 (1990).

12.   Author Information

David Arnow is an Associate Professor in the 
Department of Computer and Information Science at 
Brooklyn College. He received an A.B. in Chemistry 
from Oberlin College in 1973 and a Ph.D. in Com-
puter Science from NYU in 1981. His interests, as 
one might gather, include parallelizing scientific and 
operations research programs in a distributed envi-
ronment. He is also deeply concerned with teaching 
Computer Science to undergraduates, regardless of its 
cruelty. His address is arnow@sci.brooklyn.cuny.edu.

13.   Appendix: The DP interface header:

#include	<sys/types.h>
/* SEND MODES FOLLOW */
#define DPRECV		0x00	/* peer can get this msg via dprecv	*/
				/* peer's catching function will not be	*/
				/*     invoked 				*/

#define DPGETMSG	0x01	/* peer can get this msg via dpgetmsg	*/
				/* peer's catching function (if set	*/
				/*   with dpcatchmsg) will be invoked.	*/

#define DPREL		0x00	/* guaranteed delivery of msg to peer	*/
				/*   dp level				*/

#define DPUNREL		0x02	/* good luck: no guarantee of delivery.	*/

/* RECV FLAGS FOLLOW */
#define	DPBLOCK		0x00	/* Wait until message arrives		*/
#define	DPNOBLOCK	0x01	/* Don't wait				*/

/* RETURN CODES FOLLOW */
#define DPSUCCESS	0
#define DPFAIL		(-1)
#define	DPNOMESSAGE	(-2)
#define	DPDESTDEAD	(-3)

typedef struct dpdstr {
	char		*dpd_data;	/* point to message data	*/
	int		dpd_len;	/* # bytes of "      "		*/
	struct	dpdstr	*dpd_next;	/* next node in message		*/
} DPDNODE;
/*typedef struct dpdstr DPDNODE; */

#define	DPHOSTSIZE	516
typedef struct {		/* host info available to the app.	*/
	char	dph_info[DPHOSTSIZE];
} DPHOST;

#define DPIDSIZE	28
typedef struct {		/* the identification of a dp process	*/
		char	dpid_info[DPIDSIZE];
	} DPID;

typedef	DPID		*DPIDP;
typedef	void		(*FUNCPTR)();
#define	NULLFUNC	((FUNCPTR) 0)

int dpinit();	/* dpinit(char *prog, char **semp, int *size, *hostid)	*/
		/* STORES BACK: pointer to semantic packet		*/
		/* STORES BACK: size of semantic packet			*/
		/* STORES BACK: my integer host id			*/
		/* RETURNS: DPFAIL or # of available hosts		*/

int dpaddhost(); /* dpaddhost(char *hstn, *dmnn, *path, *user, *paswd)	*/
		 /* RETURNS:  number of hosts in host table		*/

void dpgethost();	/* dpgethost(int hid, DPHOST *hptr);		*/
			/* STORES BACK: host info for host #hid		*/

int dpsend();	/* dpsend(DPID *dest, DPDNODE *data, int mode)		*/
		/* RETURNS: DPSUCCESS or DPFAIL or DPDESTDEAD		*/

int dpwrite();	/* dpsend(DPID *dest, char *data, int nbytes, int mode)	*/
		/* RETURNS: DPSUCESS or DPFAIL or DPDESTDEAD		*/

int dprecv();	/* dprecv(DPID *src, char *data, int limit, int flags)	*/
		/* RETURNS: DPSUCCESS or DPFAIL or DPNOMESSAGE		*/

int dpgetmsg();	/* dpgetmsg(DPID *src, char *data, int limit)		*/
		/* RETURNS: DPSUCCESS or DPFAIL or DPNOMESSAGE		*/

void dpexit();	/* dpexit(char *exitstrng): normal exit-- just this one	*/
void dpstop();	/* dpstop(char *stopmsg): panic exit-- stop all dp	*/

void dpgetpid();/* dpgetpid(DPID *myid)					*/
		/* STORES BACK: dp process id of executing process	*/

void dpcall();	/* dpcall(long t, FUNCPTR f)				*/
		/* set alarm for user's function f			*/

void dppause();	/* dppause(long t, FUNCPTR f)				*/
		/* set alarm for user's function f and pause		*/

int dpblock();	/* disable interrupts					*/
void dpunblock();/* enable interrupts					*/

int dpspawn();	/* dpspawn(char *prog, DPID *newid, int hid, \		*/
		/*	   char *semantic, int size, int sendflag)	*/
	   	/* spawn a process prog on host #hid, giving it the	*/
	 	/* semantic packet indicated; STORES BACK the id of	*/
		/* the new process and returns DPSUCCESS or DPFAIL	*/

int dpinvite();	/* dpinvite(char *path, *semantic, int size)		*/
		/* set up invitation for processes to become DPized	*/
		/* RETURNS:  DPSUCCESS OR DPFAIL			*/

int dpjoin();	/* dpjoin(char *path, **semp, int *semsize, *hostid)	*/
		/* join up as a DP process				*/
		/* parameters similar to dpinit()			*/

FUNCPTR dpcatchmsg();	/* dpcatchmsg(FUNCPTR f)			*/
			/* RETURNS NULLFUNC or pointer to previous	*/
			/*	catching function			*/

void dpsetexfun();	/* dpsetexfun(FUNCPTR f)			*/
			/* set a function to be called when exiting	*/

void dphtonid();	/* dphtonid(DPID *nid, *hid)			*/
			/* converts host byte-ordered hid to network	*/
			/* byte order and store it in nid		*/

void dpntohid();	/* dpntohid(DPID *hid, *nid)			*/
			/* converts network byte-ordered nid to host	*/
			/* byte order and store it in hid		*/

int dpidmatch();	/* dpidmatch(DPID *id1, DPID *id2)		*/
			/* returns true if id1 and id2 are equal	*/
			/* returns false otherwise			*/
#define dontohl(n)	(long) ntohl((u_long) (n))	/* signed #s	*/
#define dohtonl(n)	(long) htonl((u_long) (n))
#define dontohs(n)	(short) ntohs((u_short) (n))
#define dohtons(n)	(short) htons((u_short) (n))
#define dohtoni(n)	( (int) ( (sizeof(int) == sizeof(short))? dohtons(n):dohtonl(n)))
#define dontohi(n)	( (int) ( (sizeof(int) == sizeof(short))? dontohs(n):dontohl(n)))

