Time management in distributed simulation

Introduction

Much of my work involves discrete event simulation, where a simulation advances, in discrete jumps, from one “interesting” point in time (called an event) to the next.  Managing execution of such a simulation is pretty straightforward when everything happens in a single thread.  But what if multiple processes, each representing just one component of a larger simulation, want to play nicely together?  How do we control the advance of each process’s “logical time,” so that events are always handled in time order, with no one lagging behind or jumping too far ahead?  In particular– and the question that motivated this post– what is lookahead, and why does my simulation need to know about it?

My goal in this post is to describe how this works at an introductory level: the protocol for communicating events and time advances, and the constraints imposed by that protocol.  But just as writing a compiler or interpreter can improve understanding of programming languages, I think actually implementing this protocol can help to understand why those constraints exist.  And rather than stick to pseudo-code as in most of the relevant literature, I wanted to write something that actually runs.  The end result is less than 90 lines of Python code, included below, which may be used either as a sandbox for single-step experimenting at the interpreter prompt, or to coordinate an actual distributed simulation:

"""Conservative time management for distributed simulation."""

import collections
import heapq
from functools import reduce

federations = collections.defaultdict(dict)
epsilon_time = 1e-9

class Federate:
    """Manage communication and time constraint/regulation for a federate."""

    def __init__(self):
        """Create an "orphan" federate not joined to any federation."""
        self.federation = None

    def join(self, federate_name, federation_name, lookahead):
        """Join federation as time-constrained/regulating federate."""
        assert(self.federation is None)
        assert(lookahead > epsilon_time)
        federation = federations[federation_name]
        assert(not federate_name in federation)
        self.name = federate_name
        self.federation_name = federation_name
        self.federation = federation
        time = reduce(max, [fed.time for fed in federation.values()], 0)
        self.time = max(time - lookahead + epsilon_time, 0)
        self.requested = self.time
        self.lookahead = lookahead
        self.events = []
        self.event_tag = 0
        federation[federate_name] = self
        self.grant(self.time)

    def resign(self):
        """Resign from federation."""
        self.federation.pop(self.name)
        if self.federation:
            self.push()
        else:
            federations.pop(self.federation_name)
        self.federation = None

    def send(self, event, time):
        """Send future event in timestamp order."""
        assert(time >= self.requested + self.lookahead)
        for fed in self.federation.values():
            if not fed is self:
                heapq.heappush(fed.events,
                               (time, self.name, self.event_tag, event))
        self.event_tag = self.event_tag + 1

    def request(self, time):
        """Request advance to given future time."""
        assert(time > self.time)
        assert(self.requested == self.time)
        self.requested = time
        self.push()

    def receive(self, event, time):
        """Called when timestamp order event is received via send()."""
        print('{}.receive(event={}, time={})'.format(self.name, event, time))

    def grant(self, time):
        """Called when next event request() is granted."""
        print('{}.grant(time={})'.format(self.name, time))

    def next_grant_time(self):
        time = self.requested
        if time > self.time and self.events:
            time = min(time, self.events[0][0])
        return time

    def advance(self):
        galt = reduce(min,
                      [fed.next_grant_time() + fed.lookahead
                       for fed in self.federation.values() if not fed is self],
                      float('inf'))
        if self.next_grant_time() < galt:
            while self.events and self.events[0][0] <= self.requested:
                self.requested, source, tag, event = heapq.heappop(self.events)
                self.receive(event, self.requested)
            self.time = self.requested
            self.grant(self.time)

    def push(self):
        for fed in self.federation.values():
            if fed.requested > fed.time:
                fed.advance()

From one to many

Before jumping into distributed simulation, though, let’s start simple with just a single process, and consider what an event-based simulation framework might look like:

import heapq
import collections

class Simulation:
    def __init__(self):
        """Create an "empty" simulation."""
        self.time = float('-inf')
        self.events = []
        self.event_tag = 0
        self.listeners = collections.defaultdict(list)

    def publish(self, event, time):
        """Insert event into the local queue."""
        assert(time >= self.time)
        heapq.heappush(self.events,
                       (time, self.event_tag, event))
        self.event_tag = self.event_tag + 1

    def subscribe(self, event_type, listener):
        """Register listener callback for events of the given type."""
        self.listeners[event_type].append(listener)

    def run(self):
        """Run simulation."""
        while self.events:
            self.time, tag, event = heapq.heappop(self.events)
            for listener in self.listeners[event.type]:
                listener(self, event, time)

The idea is to maintain a priority queue of future events, sorted by time.  As the simulation is run(), we “advance” time by iteratively popping the next time-stamped event from the queue, and notifying any listeners that subscribe() to events of that type, who may in turn publish() (i.e., insert) additional future events into the queue.

There are several things to note here:

First, your discrete event simulation may not look like this at all.  It could be process-based, or a simple batch for-loop, or whatever.  That’s okay– this is just a convenient generic starting point, so that once we start interacting with other components in a distributed simulation, we can see how to modify the publish() and run() methods in a natural way.

Second, at this point the only real constraint is that we cannot publish events in the past (see the assertion in line 14).  That is, while handling an event that occurs at a given time t, a listener is free to publish new events that occur at any time greater than or even equal to t.  We will have to restrict this freedom somewhat when participating in a distributed simulation.

Finally, because multiple events can have the same time stamp, repeatability is a concern.  We want to ensure that “ties” are broken in a deterministic, repeatable way, independent of the particular implementation of the underlying priority queue.  To do this, note that elements of the queue are actually tuples of the form (time, tag, event), with the natural lexicographic ordering.  The tag is just a monotonically increasing counter that provides such a tie-breaker.  It will eventually be useful to add a source element to tuples like this, indicating the string name of the simulation generating the event… but only once we start receiving events from other external simulation components (with different names).

Federation of federates

In what follows, I will try to use terminology consistent with the High-Level Architecture (HLA), a rather ridiculously generic name conveying little information about what it actually is: an IEEE-standardized definition of services for managing a distributed simulation via a centralized run-time infrastructure (RTI).  But although this will be in HLA-speak, it’s worth noting that the ideas presented here are applicable to distributed simulation in general, not just to HLA in particular.

An HLA federation is a distributed simulation consisting of a collection of simulation components called federates, interacting via the RTI.  The RTI acts as both the mailman and timekeeper for the federation, coordinating the communication of events between federates and the granting of requests by each federate to advance their clocks forward in (simulated) logical time.

Simulation federates interacting via centralized RTI.  (Image created by Arichnad 2009, retrieved from http://en.wikipedia.org/wiki/File:RTI.svg)

Simulation federates interacting via centralized RTI. (Public domain image created by Arichnad 2009, retrieved from http://en.wikipedia.org/wiki/File:RTI.svg)

Note that in the figure above, the federates never communicate directly with each other.  Everything goes through the RTI: we will assume that each federate communicates only with the RTI via its own reliable, in-order channel (e.g., a TCP socket).  This simplifies the use of the code provided here, since we can leave out the multi-threaded synchronization details of the actual inter-process communication, and instead focus on the single “main” thread in which the RTI responds to the already-serialized sequence of messages received from various federates.  (However, it is an exercise for the reader–or another post– to show that this centralization is not strictly necessary.)

Properties and states of a federate

To manage a federation of n federates from the RTI, let’s maintain the following properties of each federate:

  • t_i is the current logical time of federate i.  Note that at any given point in wall clock time, different federates may in general have different logical times.
  • r_i \geq t_i is the requested time to which federate i has requested advance.  A federate may be in one of two states: if r_i > t_i, then federate i is in the Time Advancing state.  If r_i = t_i, then federate i is in the Time Granted state.
  • \Delta_i > 0 is the lookahead for federate i; more on this shortly.
  • Q_i is the priority queue of future time-stamped events, received by the RTI from other federates, to be delivered to federate i.

The message protocol

A federate may send four types of messages to the RTI: it can (1) join a federation, (2) exit or “resign” from a federation, (3) request a time advance, or (4) send an event to other federates:

  • join(federate, federation, lookahead) sent by a simulation component indicates a request to join a named federation (or create it if it does not yet exist) as a federate with the given name and positive lookahead.  The name of the federate will be used to deterministically order identically time-stamped events as mentioned above.  The name of the federation may be used to allow a single RTI process to service multiple federations (with distinct names) simultaneously.  A joining federate is effectively in the Time Advancing state, and must wait until it receives an initial grant() message (see below) to initialize its logical time.
  • resign() sent by a federate indicates an exit from the currently joined federation.
  • request(t) sent by federate i indicates a request to advance to future logical time t > t_i.  Federate i must currently be in the Time Granted state (i.e., r_i = t_i), and upon sending this message the federate transitions to the Time Advancing state (i.e., r_i is updated to the given future time t).  Note that the federate’s current logical time t_i remains unchanged; the federate must wait until it receives a grant() message in response from the RTI (see below).
  • send(e, t) sent by federate i indicates that event e should be sent to all other federates in the federation with time stamp t \geq r_i + \Delta_i.  (The event message is not immediately delivered, however, but is instead inserted into the RTI’s central queues Q_j for all other federates j.)  A federate is free to send these event messages in either the Time Granted or Time Advancing states; however, note that all such “externally published” events must have strictly future time stamps, as specified by the positive lookahead \Delta_i.

In the other direction, the RTI may send just two types of messages back to a federate… and will only do so when a federate is in the Time Advancing state:

  • receive(e, t) sent to federate i indicates that external event e should be effectively inserted into the federate’s local event queue with time stamp t, where t_i < t \leq r_i.  The federate may receive zero or more additional receive() event messages, all with the same time stamp, followed by:
  • grant(t) sent to federate i indicates that the federate has transitioned from the Time Advancing to the Time Granted state, with its new logical time t_i— and requested time r_i— updated to the given time t \leq r_i.  Note that the granted time may be less than the originally requested time… but this will only be the case if one or more external events are received between the request() and the grant(), in which case the granted time will be equal to the time stamp on the received event(s).

The punch line is that, in return for federates adhering to this protocol, the RTI “promises” to maintain the following relatively simple invariant throughout execution of a federation:

A federate will only receive external events with strictly future time stamps.  That is, at all times, a federate has already encountered all possible external events with time stamps up to and including its current logical time.

The Python code above implements the RTI’s role in this protocol.  Call the Federate class methods join(), resign(), send(), and request() to “send” the corresponding messages to the RTI, and the receive() and grant() methods are callbacks indicating the corresponding response messages.

Conclusion

Coming back now to the framework for a single simulation component, we can see how to modify the event loop to participate in a distributed simulation.  Before we process the next event in our local queue, we first “peek” at its time stamp, and request advance to that logical time.  While waiting for the time advance grant, we insert any externally received events into our local queue– possibly ahead of the event time to which we initially tried to advance.

def run(self):
    """Run simulation."""
    while self.events:

        # Peek at next event time stamp in local queue.
        next_time, tag, event = self.events[0]

        # Send request() message, wait for grant().
        self.request(next_time)
        self.time, tag, event = heapq.heappop(self.events)
        for listener in self.listeners[event.type]:
            listener(self, event, time)

def receive(self, event, time):
    """Insert external event into the local queue."""
    self.publish(event, time)

Note that this event loop is actually more “relaxed” than it could be.  That is, when we request() a time advance, we simply block until we get the grant(), storing any intervening receive() events into our local queue… without immediately processing any of them.  Thus, we only ever send() external events when we are in the Time Granted state, although the time management protocol supports sending when in the Time Advancing state as well.

I have obviously simplified things quite a bit here.  For example, in the actual HLA interface, there are complementary notions of time constraint (blocking for grant() callbacks) and time regulation (observing the lookahead restriction when sending events), that can be independently enabled or disabled.  A federate’s lookahead can be modified at run time.  And other time advance services are available beyond the one discussed here (typically referred to as the “Next Message Request” service), etc.  But hopefully this still serves as a useful starting point.

And I have not even mentioned what I actually find to be the most interesting aspect of this problem: a proof of correctness.  That is, how do we know that this implementation actually preserves the “Thou shalt receive only future events” invariant promised above?  In particular, why is strictly positive lookahead critical to that proof of correctness, and what goes wrong when we try to support zero lookahead?  And how can we “decentralize” the algorithm without changing the basic protocol from the perspective of the individual federates?  These are all interesting questions, maybe subject matter for a later post.

References:

  1. 1516.1-2010 – IEEE Standard for Modeling and Simulation (M&S) High Level Architecture (HLA)– Federate Interface Specification (Section 8). [HTML]
  2. Fujimoto, R. M., Parallel and Distributed Simulation Systems. New York: John Wiley and Sons, 2000 (Chapter 3).
  3. Lamport, L., Time, Clocks and the Ordering of Events in a Distributed System, Communications of the ACM21(7) July 1978, p. 558-565 [HTML]
  4. Misra, J., Distributed Discrete-Event Simulation, Computing Surveys, 18(1) March 1986, p. 39-65 [PDF]

 

Update: Chutes and Ladders is long, but not *that* long

It occurred to me, as I was failing to pay attention in a class this past week, that I neglected an important detail in my recent post analyzing the expected number of turns to complete the game Chutes and Ladders: namely, that one generally does not play the game alone.

Recall from the previous post that we can express the expected value x_i of the number of die rolls (or spins of the spinner) needed for a player to reach the final square 100, starting from square i, “recursively” as

x_i = 1 + \frac{1}{6} \sum_{j=1}^6 x_{f(i,j)}, i < 100

x_{100} = 0

where f(i,j) is the number of the square reached from square i by rolling j (thus encoding the configuration of chutes and ladders on the board).  Solving this system of 100 equations yields the value x_0 of about 39.5984 turns on average for our hypothetical player to finish the game.

However, it is a bit misleading to stop there, since the expected total number of turns in a game with multiple players does not simply scale directly as the number of players.  That is, for example, in a game with two players, we should not expect nearly 80 total rolls of the die, 40 for each player.  As simulation confirms, the game typically ends much more quickly than that, with an actual average of about 52.5188 total rolls, or only about 26 turns for each player.

(Why is this?  This is a good example of the common situation where we can gain insight by considering “extreme” aspects or versions of the problem.  In this case, first note that the shortest possible path from the start to the final square 100 is just seven moves.  It is very unlikely that any single player will actually take this short route, with a probability of only 73/46656, or about 1 in 640.  But now suppose that instead of just one or even two players, there are one million players.  It is now a near certainty that some one of those million players will happen to win the lottery and take that short route… and so we should expect that the average number of total turns should be “only” about 7 million, instead of 40 million as the earlier post might suggest.)

We can still compute this two-player expected value exactly, using the same approach as before… but it starts to get expensive, because instead of just 100 equations in 100 unknowns, we now need 100^2 or 10,000 equations, to keep track of the possible positions of both players:

x_{i,j} = 1 + \frac{1}{6} \sum_{k=1}^6 x_{j,f(i,k)}, i,j < 100

x_{i,100} = 0, i < 100

Solving yields the desired value x_{0,0} \approx 52.5188.