Spartan Documentation

spartan Package

spartan Package

Spartan: A distributed array language.

Spartan expressions and optimizations are defined in the spartan.expr package. The RPC and serialization library are defined in spartan.rpc.

A Spartan execution environment consists of a master process and one or more workers; these are defined in the spartan.master and spartan.worker modules respectively.

Workers communicate with each other and the master via RPC; the RPC protocol is based on ZeroMQ and is located in the spartan.rpc package. RPC messages used in Spartan are defined in spartan.core.

For convenience, all array operations are routed through a “context”; this tracks ownership (which worker stores each part of an array) and simplifies sending out RPC messages to many workers at once. This context, is for historical reasons located in spartan.blob_ctx.

spartan.__init__.initialize(argv=None)[source]
spartan.__init__.shutdown()[source]

blob_ctx Module

The BlobCtx manages the state of a Spartan execution: it stores the location of tiles and other workers in the system, and contains methods for fetching and updating array data, creating and removing tiles and running user-defined kernel functions on tile data.

class spartan.blob_ctx.BlobCtx(worker_id, workers, local_worker=None)[source]

Bases: object

Create a new context.

Parameters:
  • worker_id (int) – Identifier for this worker
  • workers (list of RPC clients) – RPC connections to other workers in the computation.
  • local_worker (Worker) – A reference to the local worker creating this context. This is used to avoid sending RPC messages for operations that can be serviced locally.
create(data, hint=None, timeout=None)[source]

Create a new tile to hold data.

Parameters:
  • data (Numpy array) – Data to store in a tile.
  • hint (int) – Optional. Worker to store data on. If not specified, workers are chosen in round-robin order.
  • timeout (float)
destroy(tile_id)[source]

Destroy a tile.

Parameters:tile_id (int) – Tile to destroy.
destroy_all(tile_ids)[source]

Destroy all tiles

Parameters:tile_ids (list) – Tiles to destroy.
get(tile_id, subslice, wait=True, timeout=None)[source]

Fetch a region of a tile.

Parameters:
  • tile_id (int) – Tile to fetch from.
  • subslice (slice or None) – Portion of tile to fetch.
  • wait (boolean) – Wait for this operation to finish before returning.
  • timeout (float)
heartbeat(worker_status, timeout=None)[source]

Send a heartbeat request to the master.

Parameters:
  • worker_status
  • timeout
is_master()[source]

True if this context is running in the master process.

map(tile_ids, mapper_fn, kw, timeout=None)[source]

Run mapper_fn on all tiles in tile_ids.

Parameters:
  • tile_ids (list) – List of tiles to operate on
  • mapper_fn (function) – Function taking (extent, kw)
  • kw (dict) – Keywords to supply to mapper_fn.
  • timeout – optional RPC timeout.
Returns:

dict – mapping from (source_tile, result of mapper_fn)

new_tile_id()[source]

Create a new tile id. Does not create a new tile, or any data.

Returns:`TileId` – Id of created tile.
partial_map(targets, tile_ids, mapper_fn, kw, timeout=None)[source]
tile_op(tile_id, fn)[source]

Run fn on a single tile.

Returns:Future – Result of fn.
update(tile_id, region, data, reducer, wait=True, timeout=None)[source]

Update region of tile_id with data.

data is combined with existing tile data using reducer.

Parameters:
  • tile_id (int)
  • region (slice)
  • data (Numpy array)
  • reducer (function) – function from (array, array) -> array
  • wait (boolean) – If true, wait for completion before returning.
  • timeout (float)
spartan.blob_ctx.get()[source]

Thread-local: return the context for this process.

spartan.blob_ctx.set(ctx)[source]

Thread-local: set the context for this process.

This is only called by the currently running worker or master.

core Module

Definitions for RPC messages.

These are used for sending and receiving array data (UpdateReq, GetReq and GetResp), running a function on array data (KernelReq, ResultResp), registering and initializing workers (RegisterReq, InitializeReq).

class spartan.core.CreateTileReq(*args, **kw)

Bases: spartan.core.Message

members = ['tile_id', 'data']
node_items(node)
node_type = 'CreateTileReq'
class spartan.core.CreateTileResp(*args, **kw)

Bases: spartan.core.Message

members = ['tile_id']
node_items(node)
node_type = 'CreateTileResp'
class spartan.core.DestroyReq(*args, **kw)

Bases: spartan.core.Message

Destroy any tiles listed in ids.

members = ['ids']
node_items(node)
node_type = 'DestroyReq'
class spartan.core.EmptyMessage(*args, **kw)

Bases: spartan.core.Message

members = []
node_items(node)
node_type = 'EmptyMessage'
class spartan.core.GetReq(*args, **kw)

Bases: spartan.core.Message

Fetch a region from a tile.

members = ['id', 'subslice']
node_items(node)
node_type = 'GetReq'
class spartan.core.GetResp(*args, **kw)

Bases: spartan.core.Message

The result of a fetch operation: the tile fetched from and the resulting data.

members = ['id', 'data']
node_items(node)
node_type = 'GetResp'
class spartan.core.HeartbeatReq(*args, **kw)

Bases: spartan.core.Message

members = ['worker_id', 'worker_status']
node_items(node)
node_type = 'HeartbeatReq'
class spartan.core.InitializeReq(*args, **kw)

Bases: spartan.core.Message

Sent from the master to a worker after all workers have registered.

Contains the workers unique identifier and a list of all other workers in the execution.

members = ['id', 'peers']
node_items(node)
node_type = 'InitializeReq'
class spartan.core.LocalKernelResult(*args, **kw)

Bases: spartan.core.Message

The local result returned from a kernel invocation.

LocalKernelResult.result is returned to the master. LocalKernelResult.futures may be None, or a list of futures that must be waited for before returning the result of this kernel.

members = ['result', 'futures']
node_items(node)
node_type = 'LocalKernelResult'
class spartan.core.Message

Bases: object

Base class for all RPC messages.

class spartan.core.RegisterReq(*args, **kw)

Bases: spartan.core.Message

Sent by worker to master when registering during startup.

members = ['host', 'port', 'worker_status']
node_items(node)
node_type = 'RegisterReq'
class spartan.core.RunKernelReq(*args, **kw)

Bases: spartan.core.Message

Run mapper_fn on the list of tiles tiles.

For efficiency (since Python serialization is slow), the same message is sent to all workers.

members = ['blobs', 'mapper_fn', 'kw']
node_items(node)
node_type = 'RunKernelReq'
class spartan.core.RunKernelResp(*args, **kw)

Bases: spartan.core.Message

The result returned from running a kernel function.

This is typically a map from Extent to TileId.

members = ['result']
node_items(node)
node_type = 'RunKernelResp'
class spartan.core.TileId

Bases: object

A TileId uniquely identifies a tile in a Spartan execution.

Currently, TileId instances consist of a worker index and a blob index for that worker.

__eq__

x.__eq__(y) <==> x==y

__ge__

x.__ge__(y) <==> x>=y

__gt__

x.__gt__(y) <==> x>y

__hash__

x.__hash__() <==> hash(x)

__le__

x.__le__(y) <==> x<=y

__lt__

x.__lt__(y) <==> x<y

__ne__

x.__ne__(y) <==> x!=y

__repr__

x.__repr__() <==> repr(x)

id
worker
class spartan.core.TileOpReq(*args, **kw)

Bases: spartan.core.Message

members = ['tile_id', 'fn']
node_items(node)
node_type = 'TileOpReq'
class spartan.core.UpdateReq(*args, **kw)

Bases: spartan.core.Message

Update region (a slice, or None) of tile with id id .

data should be a Numpy or sparse array. data is combined with existing tile data using the supplied reducer function.

members = ['id', 'region', 'data', 'reducer']
node_items(node)
node_type = 'UpdateReq'
class spartan.core.WorkerStatus

Bases: object

Status information sent to the master in a heartbeat message.

__repr__

x.__repr__() <==> repr(x)

add_task_failure()
add_task_report()
clean_status()
cpu_usage
last_report_time
mem_usage
num_processors
task_failures
task_reports
total_physical_memory
update_status()

cluster Module

Functions for managing a cluster of machines.

Spartan currently supports running workers as either threads in the current process, or by using ssh to connect to one or more machines.

A Spartan “worker” is a single process; more than one worker can be run on a machine; typically one worker is run per core.

class spartan.cluster.AssignMode[source]

Bases: object

BY_CORE = 1
BY_NODE = 2
class spartan.cluster.AssignModeFlag(name, default=None, help='')[source]

Bases: spartan.config.Flag

set(option_str)[source]
class spartan.cluster.HostListFlag(name, default=None, help='')[source]

Bases: spartan.config.Flag

set(str)[source]
spartan.cluster.start_cluster(num_workers, use_cluster_workers)[source]

Start a cluster with num_workers workers.

If use_cluster_workers is True, then use the remote workers defined in spartan.config. Otherwise, workers are all spawned on the localhost.

Parameters:
  • num_workers
  • use_cluster_workers
spartan.cluster.start_remote_worker(worker, st, ed)[source]

Start processes on a worker machine.

The machine will launch worker processes st through ed.

Parameters:
  • worker – hostname to connect to.
  • st – First process index to start.
  • ed – Last process to start.

config Module

Configuration options and flags.

Options may be specified on the command line, or via a configuration file. Configuration files should be placed in $HOME/.config/spartan.ini

To facilitate changing options when running with nosetests, flag values are also parsed out from the SPARTAN_OPTS environment variable.

SPARTAN_OPTS='--profile_master=1 ...' nosetests
class spartan.config.BoolFlag(name, default=None, help='')[source]

Bases: spartan.config.Flag

Boolean flag.

Accepts ‘0’ or ‘false’ for false values, ‘1’ or ‘true’ for true values.

set(str)[source]
class spartan.config.Flag(name, default=None, help='')[source]

Bases: object

Base object for a representing a command line flag.

Subclasses must implement the set operation to parse a flag value from a command line string.

class spartan.config.Flags[source]

Bases: object

add(flag)[source]
class spartan.config.IntFlag(name, default=None, help='')[source]

Bases: spartan.config.Flag

set(str)[source]
class spartan.config.LogLevelFlag(name, default=None, help='')[source]

Bases: spartan.config.Flag

set(str)[source]
class spartan.config.SortingHelpFormatter(prog, indent_increment=2, max_help_position=24, width=None)[source]

Bases: argparse.HelpFormatter

add_arguments(actions)[source]
class spartan.config.StrFlag(name, default=None, help='')[source]

Bases: spartan.config.Flag

set(str)[source]
spartan.config.parse(argv)[source]

Parse configuration from flags and/or configuration file.

master Module

Master process definition.

Spartan computations consist of a master and one or more workers.

The master tracks the location of array data, manages worker health, and runs user operations on workers.

class spartan.master.Master(port, num_workers)[source]

Bases: object

get_available_workers()[source]
get_worker_scores()[source]
get_workers_for_reload(array)[source]
heartbeat(req, handle)[source]

RPC method.

Called by worker processes periodically.

Parameters:
  • reqWorkerStatus
  • handlePendingRequest

Returns: EmptyMessage

init_worker_score(worker_id, worker_status)[source]
is_slow_worker(worker_id)[source]
mark_failed_worker(worker_id)[source]
mark_failed_workers()[source]
register(req, handle)[source]

RPC method.

Register a new worker with the master.

Parameters:
  • req (RegisterReq)
  • handle (PendingRequest)
register_array(array)[source]
shutdown()[source]

Shutdown all workers and halt.

update_avg_score()[source]
update_worker_score(worker_id, worker_status)[source]
wait_for_initialization()[source]

Blocks until all workers are initialized.

spartan.master.get()[source]

node Module

Helper for constructing trees of objects.

Provides pretty printing, equality testing, hashing and keyword initialization.

class spartan.node.Node[source]

Bases: object

spartan.node.get_members(klass)[source]

Walk through classes in mro order, accumulating member names.

spartan.node.get_mro(klass)[source]
spartan.node.get_reverse_mro(klass)[source]
spartan.node.node_initializer(self, *args, **kw)[source]
spartan.node.node_iteritems(node)[source]
spartan.node.node_str(node)[source]
spartan.node.node_type(klass)[source]

Decorator to add node behavior to a class.

util Module

class spartan.util.Assert[source]

Bases: object

Assertion helper functions.

a = 'foo'
b = 'bar'

Assert.eq(a, b)
# equivalent to:
# assert a == b, 'a == b failed (%s vs %s)' % (a, b)
static all_close(a, b)[source]
static all_eq(a, b)[source]
static eq(a, b, fmt='', *args)[source]
static ge(a, b, fmt='', *args)[source]
static gt(a, b, fmt='', *args)[source]
static isinstance(expr, klass)[source]
static iterable(expr)[source]
static le(a, b, fmt='', *args)[source]
static lt(a, b, fmt='', *args)[source]
static ne(a, b, fmt='', *args)[source]
static no_duplicates(collection)[source]
static not_null(expr)[source]
static true(expr)[source]
class spartan.util.EZTimer(name)[source]

Bases: object

Lazy timer.

Prints elapsed time when destroyed.

class spartan.util.FileWatchdog(file_handle=<open file '<stdin>', mode 'r' at 0x2b73f70c70c0>, on_closed=<function <lambda> at 0x3aa2de8>)[source]

Bases: threading.Thread

Watchdog for a file (typically sys.stdin or sys.stdout).

When the file closes, terminate the process. (This occurs when an ssh connection is terminated, for example.)

Parameters:
  • file_handle
  • on_closed
run()[source]
class spartan.util.Timer[source]

Bases: object

start()[source]
stop()[source]
spartan.util.as_list(x)[source]
spartan.util.copy_docstring(source_function)[source]

Decorator.

Copy the docstring from source_function to this function.

spartan.util.count_calls(fn)[source]

Decorator: count calls to fn and print after each 100. :param fn:

spartan.util.divup(a, b)[source]
spartan.util.dump_stacks(out)[source]

Dump the stacks of all threads.

spartan.util.findCaller(obj)[source]
spartan.util.flatten(lst, depth=1, unique=False)[source]
spartan.util.get_core_mapping()[source]

Read /proc/cpuinfo and return a dictionary mapping from:

processor_id -> (package, core)

spartan.util.is_iterable(x)[source]
spartan.util.is_lambda(fn)[source]

Return True if fn is a lambda expression.

For some reason testing against LambdaType does not work correctly.

spartan.util.join_tuple(tuple_a, tuple_b)[source]
spartan.util.log_debug(*args, **kw)[source]
spartan.util.log_error(*args, **kw)[source]
spartan.util.log_fatal(*args, **kw)[source]
spartan.util.log_info(*args, **kw)[source]
spartan.util.log_warn(*args, **kw)[source]
spartan.util.memoize(f)[source]

Decorator.

Cache outputs of f; repeated calls with the same arguments will be served from the cache.

spartan.util.rtype_check(typeclass)[source]

Function decorator to check return type.

Usage:

@rtype_check(int)
def fn(x, y, z):
  return x + y
spartan.util.stack_signal()[source]
spartan.util.synchronized(fn)[source]

Decorator: execution of this function is serialized by an threading.RLock. :param fn:

spartan.util.timeit(f, name=None)[source]

Run f and return (time_taken, result).

Parameters:
  • f
  • name
spartan.util.timer_ctx(*args, **kwds)[source]

Context based timer:

Usage:

with timer_ctx('LoopOp'):
  for i in range(10):
    my_op()
spartan.util.trace_fn(fn)[source]

Function decorator: log on entry and exit to fn.

worker Module

This module defines the Worker class and related helper functions.

Workers in Spartan manage array data and computation; methods are available for creating, updating, reading and deleting tiles of arrays. Workers can also run a user-specified function on a set of tiles.

Workers periodically send a heartbeat message to the master; if the master cannot be contacted for a sufficiently long interval, workers shut themselves down.

class spartan.worker.Worker(master)[source]

Bases: object

Spartan workers generally correspond to one core of a machine.

Workers manage the storage of array data and running of kernel functions.

id int

The unique identifier for this worker

_peers dict

Mapping from worker id to RPC client

_blobs dict

Mapping from tile id to tile.

create(req, handle)[source]

Create a new tile.

Parameters:
  • reqCreateTileReq
  • handlePendingRequest
destroy(req, handle)[source]

Delete zero or more blobs.

Parameters:
  • reqDestroyReq
  • handlePendingRequest
get(req, handle)[source]

Fetch a portion of a tile.

Parameters:
  • reqGetReq
  • handlePendingRequest
initialize(req, handle)[source]

Initialize worker.

Assigns this worker a unique identifier and sets up connections to all other workers in the process.

Parameters:
  • req (InitializeReq) – foo
  • handle (PendingRequest) – bar
run_kernel(req, handle)[source]

Run a kernel on tiles local to this worker.

Parameters:
  • reqKernelReq
  • handlePendingRequest
shutdown(req, handle)[source]

Shutdown this worker.

Shutdown is deferred to another thread to ensure the RPC reply is sent before the poll loop is killed.

Parameters:
  • reqEmptyMessage
  • handlePendingRequest
tile_op(req, handle)[source]
update(req, handle)[source]

Apply an update to a tile.

Parameters:
  • reqUpdateReq
  • handlePendingRequest
wait_for_shutdown()[source]

Wait for the worker to shutdown.

Periodically send heartbeat updates to the master.

array Package

distarray Module

class spartan.array.distarray.Broadcast(base, shape)[source]

Bases: spartan.array.distarray.DistArray

Mimics the behavior of Numpy broadcasting.

Takes an input of shape (x, y) and a desired output shape (x, y, z), the broadcast object reports shape=(x,y,z) and overrides __getitem__ to return the appropriate values.

fetch(ex)[source]
class spartan.array.distarray.DistArray[source]

Bases: object

The interface required for distributed arrays.

A distributed array should support:

  • fetch(ex) to fetch data
  • update(ex, data) to combine an update with existing data
  • foreach_tile(fn, kw)
fetch(ex)[source]

Fetch the region specified by extent from this array.

Parameters:ex (Extent) – Region to fetch
Returns:np.ndarray – Data from region.
foreach_tile(mapper_fn, kw)[source]
glom()[source]
map_to_array(mapper_fn, kw=None)[source]
ndim[source]
select(idx)[source]

Effectively __getitem__.

Renamed to avoid the chance of accidentally using a slow, local operation on a distributed array.

update(ex, data)[source]
class spartan.array.distarray.DistArrayImpl(shape, dtype, tiles, reducer_fn, sparse)[source]

Bases: spartan.array.distarray.DistArray

__del__()[source]

Destroy this array.

NB: Destruction is actually deferred until the next usage of the blob_ctx. __del__ can be called at anytime, including the invocation of a RPC call, which leads to odd/bad behavior.

extent_for_blob(id)[source]
fetch(region)[source]

Return a local numpy array for the given region.

If necessary, data will be copied from remote hosts to fill the region. :param region: Extent indicating the region to fetch.

foreach_tile(mapper_fn, kw=None)[source]
id()[source]
tile_shape()[source]
update(region, data, wait=True)[source]
update_slice(slc, data)[source]
class spartan.array.distarray.LocalWrapper(data)[source]

Bases: spartan.array.distarray.DistArray

Provide the DistArray interface for local data.

dtype[source]
fetch(ex)[source]
foreach_tile(mapper_fn, kw=None)[source]
map_to_array(mapper_fn, kw=None)[source]
shape[source]
class spartan.array.distarray.Slice(darray, idx)[source]

Bases: spartan.array.distarray.DistArray

Represents a Numpy multi-dimensional slice on a base DistArray.

Slices in Spartan do not result in a copy. A Slice object is returned instead. Slice objects support mapping (foreach_tile) and fetch operations.

bad_tiles[source]
fetch(idx)[source]
foreach_tile(mapper_fn, kw)[source]
spartan.array.distarray.as_array(data)[source]

Convert data to behave like a DistArray.

If data is already a DistArray, it is returned unchanged. Otherwise, data is wrapped to have a DistArray interface.

Parameters:data – An input array or array-like value.
spartan.array.distarray.best_locality(array, ex)[source]

Return the table shard with the best locality for extent ex. :param table: :param ex:

spartan.array.distarray.broadcast(args)[source]

Convert the list of arrays in args to have the same shape.

Extra dimensions are added as necessary, and dimensions of size 1 are repeated to match the size of other arrays.

Parameters:args – List of DistArray
spartan.array.distarray.broadcast_mapper(ex, tile, mapper_fn=None, bcast_obj=None)[source]
spartan.array.distarray.compute_extents(shape, tile_hint=None, num_shards=-1)[source]

Split an array of shape shape into Extent`s. Each extent contains roughly `TILE_SIZE elements if num_shards is -1.

Parameters:
  • shape – tuple. the array’s shape.
  • tile_hint – tuple indicating the desired tile shape.
Returns:

list – list of Extent

spartan.array.distarray.compute_splits(shape, tile_hint)[source]

Based on tile_hint to compute splits for each dimension of the array of shape shape

Parameters:
  • shape – tuple. the array’s shape.
  • tile_hint – tuple indicating the desired tile shape.
Returns:

list – splits for each dimension.

spartan.array.distarray.create(shape, dtype=<type 'float'>, sharder=None, reducer=None, tile_hint=None, sparse=False)[source]

Make a new, empty DistArray

spartan.array.distarray.from_replica(X)[source]

Make a new, empty DistArray from X

spartan.array.distarray.from_table(extents)[source]

Construct a distarray from an existing table. Keys must be of type Extent, values of type Tile.

Shape is computed as the maximum range of all extents.

Dtype is taken from the dtype of the tiles.

Parameters:table
spartan.array.distarray.good_tile_shape(shape, num_shards=-1)[source]

Compute a tile_shape (tile_hint) for the array.

Parameters:shape – tuple. the array’s shape.
Returns:list – tile_shape for the array
spartan.array.distarray.largest_value(vals)[source]

Return the largest array (using the underlying size for Broadcast objects).

Parameters:vals – List of DistArray.
spartan.array.distarray.take_first(a, b)[source]

extent Module

class spartan.array.extent.TileExtent

Bases: object

A rectangular tile of a distributed array.

These correspond (roughly) to a slice taken from an array (without any step component).

Arrays are indexed from the upper-left; for an array of shape (sx, sy, sz): (0,0...) is the upper-left corner of an array, and (sx,sy,sz...) the lower-right.

Extents are represented by an upper-left corner (inclusive) and a lower right corner (exclusive): [ul, lr). In addition, they carry the shape of the array they are a part of; this is used to compute global position information.

__eq__

x.__eq__(y) <==> x==y

__ge__

x.__ge__(y) <==> x>=y

__getitem__

x.__getitem__(y) <==> x[y]

__gt__

x.__gt__(y) <==> x>y

__hash__

x.__hash__() <==> hash(x)

__le__

x.__le__(y) <==> x<=y

__lt__

x.__lt__(y) <==> x<y

__ne__

x.__ne__(y) <==> x!=y

__repr__

x.__repr__() <==> repr(x)

add_dim()
array_shape
clone()
get_lr()
get_ul()
lr
ndim
ravelled_pos()
set_lr()
set_ul()
shape
size
to_global()

Convert idx from a local offset in this tile to a global offset.

to_slice()
ul
spartan.array.extent.compute_slice()

Return a new TileExtent representing base[idx]

Parameters:
  • baseTileExtent
  • idx – int, slice, or tuple(slice,...)
spartan.array.extent.create()

Create a new extent with the given coordinates and array shape.

Parameters:
  • ultuple:
  • lr
  • array_shape
spartan.array.extent.drop_axis()
spartan.array.extent.find_overlapping()

Return the extents that overlap with region.

Parameters:
  • extents – List of extents to search over.
  • regionExtent to match.
spartan.array.extent.find_rect()

Return a new (ravellled_ul, ravelled_lr) to make a rectangle for shape. If (ravelled_ul, ravelled_lr) already forms a rectangle, just return it.

Parameters:
  • ravelled_ul
  • ravelled_lr
spartan.array.extent.find_shape()

Given a list of extents, return the shape of the array necessary to fit all of them. :param extents:

spartan.array.extent.from_shape()
spartan.array.extent.from_slice()

Construct a TileExtent from a slice or tuple of slices.

Parameters:
  • idx – int, slice, or tuple(slice...)
  • shape – shape of the input array
Return type:

TileExtent corresponding to idx.

spartan.array.extent.index_for_reduction()
spartan.array.extent.intersection()
Return type:The intersection of the 2 extents as a TileExtent, or None if the intersection is empty.
spartan.array.extent.is_complete()

Returns true if slices is a complete covering of shape; that is:

array[slices] == array
Parameters:
  • shape – tuple of int
  • slices – list/tuple of slice objects
Return type:

boolean

spartan.array.extent.offset_from()
Parameters:
Return type:

A new extent using this extent as a basis, instead of (0,0,0...)

spartan.array.extent.offset_slice()
Parameters:
Return type:

A slice representing the local offsets of other into this tile.

spartan.array.extent.ravelled_pos()
spartan.array.extent.shape_for_reduction()

Return the shape for the result of applying a reduction along axis to an input of shape input_shape. :param input_shape: :param axis:

spartan.array.extent.shapes_match()

Return true if the shape of data matches the extent offset. :param offset: :param data:

spartan.array.extent.unravelled_pos()

Unravel idx into an index into an array of shape array_shape. :param idx: int :param array_shape: tuple :rtype: tuple indexing into array_shape

tile Module

class spartan.array.tile.Tile(shape, dtype, data, mask, tile_type)[source]

Bases: object

Tiles have 4 modes:

Empty – no data or mask Masked – data + mask Sparse – hashmap of positions (implicit mask) Dense – all data values have been set, mask is cleared.

data[source]
get(subslice=None)[source]
update(subslice, data, reducer)[source]
spartan.array.tile.from_data(data)[source]
spartan.array.tile.from_intersection(src, overlap, data)[source]

Return a tile for src, masked to update the area specifed by overlap.

Parameters:
  • srcTileExtent
  • overlapTileExtent
  • data
spartan.array.tile.from_shape(shape, dtype, tile_type)[source]
spartan.array.tile.merge(old_tile, subslice, update, reducer)[source]

expr Package

expr Package

Definitions of expressions and optimizations.

In Spartan, operations are not performed immediately. Instead, they are represented using a graph of Expr nodes. Expression graphs can be evaluated using the Expr.evaluate or Expr.force methods.

The base module contains the definition of Expr, the base class for all types of expressions. It also defines subclasses for wrapping common Python values: lists (ListExpr), dicts (DictExpr) and tuples ((TupleExpr).

Operations are built up using a few high-level operations – these all live in their own modules:

Optimizations on DAGs live in spartan.expr.optimize.

base Module

Defines the base class of all expressions (Expr), as well as common subclasses for collections.

class spartan.expr.base.AsArray(*args, **kw)[source]

Bases: spartan.expr.base.Expr

Promote a value to be array-like.

This should be wrapped around most user-inputs that may be used in an array context, e.g. (1 + x => map((as_array(1), as_array(x)), +))

compute_shape()[source]
label()[source]
members = ['val', 'expr_id', 'stack_trace']
node_items(node)
node_type = 'AsArray'
visit(visitor)[source]
class spartan.expr.base.CollectionExpr[source]

Bases: spartan.expr.base.Expr

CollectionExpr subclasses wrap normal tuples, lists and dicts with Expr semantics.

CollectionExpr.visit and CollectionExpr.evaluate will visit or evaluate all of the tuple, list or dictionary elements in this expression.

needs_cache = False
class spartan.expr.base.DictExpr(*args, **kw)[source]

Bases: spartan.expr.base.CollectionExpr

dependencies()[source]
iteritems()[source]
keys()[source]
members = ['vals', 'expr_id', 'stack_trace']
node_items(node)
node_type = 'DictExpr'
values()[source]
visit(visitor)[source]
class spartan.expr.base.EvalCache[source]

Bases: object

Expressions can be copied around and changed during optimization or due to user actions; we want to ensure that a cache entry can be found using any of the equivalent expression nodes.

To that end, expressions are identfied by an expression id; when an expression is copied, the expression ID remains the same.

The cache tracks results based on expression ID’s. Since this is no longer directly linked to an expressions lifetime, we have to manually track reference counts here, and clear items from the cache when the reference count hits zero.

deregister(expr_id)[source]
get(exprid)[source]
register(exprid)[source]
set(exprid, value)[source]
class spartan.expr.base.Expr[source]

Bases: object

Base class for all expressions.

Expr objects capture user operations.

An expression can have one or more dependencies, which must be evaluated before the expression itself.

Expressions may be evaluated (using Expr.force), the result of evaluating an expression is cached until the expression itself is reclaimed.

__mul__(other)[source]

Multiply 2 expressions.

Parameters:otherExpr
__rmul__(other)

Multiply 2 expressions.

Parameters:otherExpr
argmax(x, axis=None)

Compute argmax over axis.

See numpy.ndarray.argmax.

Parameters:
  • xExpr to compute a maximum over.
  • axis – Axis (integer or None).
argmin(x, axis=None)

Compute argmin over axis.

See numpy.ndarray.argmin.

Parameters:
  • xExpr to compute a minimum over.
  • axis – Axis (integer or None).
astype(x, dtype)

Convert x to a new dtype.

See numpy.ndarray.astype.

Parameters:
  • xExpr or DistArray
  • dtype
cache()[source]

Return a cached value for this Expr.

If a cached value is not available, or the cached array is invalid (missing tiles), returns None.

compute_shape()[source]

Compute the shape of this expression.

If the shape is not available (data dependent), raises NotShapeable.

Returns:tuple – Shape of this expression.
dependencies()[source]
Returns:Dictionary mapping from name to Expr.
evaluate()[source]

Evaluate an Expr.

Dependencies are evaluated prior to evaluating the expression. The result of the evaluation is stored in the expression cache, future calls to evaluate will return the cached value.

Returns:DistArray
force()[source]

Evaluate this expression (and all dependencies).

glom()[source]

Evaluate this expression and convert the resulting distributed array into a Numpy array.

Return type:np.ndarray
graphviz()[source]

Return a string suitable for use with the ‘dot’ command.

label()[source]

Graphviz label for this node.

load_data(cached_result)[source]
mean(x, axis=None)

Compute the mean of x over axis.

See numpy.ndarray.mean.

Parameters:
  • xExpr
  • axis – integer or None
ndim[source]
needs_cache = True
node_init()[source]
optimized()[source]

Return an optimized version of this expression graph.

Return type:Expr
outer(a, b)
ravel(v)

“Ravel” v to a one-dimensional array of shape (size(v),).

See numpy.ndarray.ravel. :param v: Expr or DistArray

reshape(new_shape)[source]

Return a new array with shape``new_shape``, and data from this array.

Parameters:new_shapetuple with same total size as original shape.
shape[source]

Try to compute the shape of this expression.

If the value has been computed already this always succeeds.

Return type:tuple
size[source]
sum(x, axis=None)

Sum x over axis.

Parameters:
  • x – The array to sum.
  • axis – Either an integer or None.
typename()[source]
visit(visitor)[source]

Apply visitor to all children of this node, returning a new Expr of the same type.

Parameters:visitorOptimizePass
class spartan.expr.base.ExprTrace[source]

Bases: object

Captures the stack trace for an expression.

Lazy evaluation and optimization can result in stack traces that are very far from the actual source of an error. To combat this, expressions track their original creation point, which is logged when an error occurs.

Multiple stack traces can be tracked, as certain optimizations will combine multiple expressions together.

dump()[source]
format_stack()[source]
fuse(trace)[source]
class spartan.expr.base.ListExpr(*args, **kw)[source]

Bases: spartan.expr.base.CollectionExpr

dependencies()[source]
members = ['vals', 'expr_id', 'stack_trace']
node_items(node)
node_type = 'ListExpr'
visit(visitor)[source]
exception spartan.expr.base.NotShapeable[source]

Bases: exceptions.Exception

Thrown when the shape for an expression cannot be computed without first evaluating the expression.

class spartan.expr.base.TupleExpr(*args, **kw)[source]

Bases: spartan.expr.base.CollectionExpr

dependencies()[source]
members = ['vals', 'expr_id', 'stack_trace']
node_items(node)
node_type = 'TupleExpr'
visit(visitor)[source]
class spartan.expr.base.Val(*args, **kw)[source]

Bases: spartan.expr.base.Expr

Convert an existing value to an expression.

compute_shape()[source]

Apply visitor to all children of this node, returning a new Expr of the same type.

Parameters:visitorOptimizePass
dependencies()[source]
members = ['val', 'expr_id', 'stack_trace']
needs_cache = False
node_items(node)
node_type = 'Val'
visit(visitor)[source]

Apply visitor to all children of this node, returning a new Expr of the same type.

Parameters:visitorOptimizePass
spartan.expr.base.as_array(v)[source]

Convert a numpy value or scalar into an Expr.

Parameters:vExpr, numpy value or scalar.
spartan.expr.base.eager(node)[source]

Eagerly evaluate node and convert the result back into an Expr.

Parameters:nodeExpr to evaluate.
spartan.expr.base.evaluate(node)[source]

Evaluate node.

Parameters:nodeExpr
spartan.expr.base.expr_like(expr, **kw)[source]

Construct a new expression like expr.

The new expression has the same id, but is initialized using kw

spartan.expr.base.force(node)[source]

Evaluate node. :param node: Expr

spartan.expr.base.glom(value)[source]

Evaluate this expression and return the result as a numpy.ndarray.

spartan.expr.base.lazify(val)[source]

Lift val into an Expr node.

If val is already an expression, it is returned unmodified.

Parameters:val – anything.
spartan.expr.base.optimized_dag(node)[source]

Optimize and return the DAG representing this expression.

Parameters:node – The node to compute a DAG for.

builtins Module

Basic numpy style operations on arrays.

These include –

spartan.expr.builtins.abs(v)[source]
spartan.expr.builtins.add(a, b)[source]
spartan.expr.builtins.arange(shape, dtype=<type 'float'>, tile_hint=None)[source]

An extended version of np.arange.

Returns a new array of the given shape and dtype. Values of the array are equivalent to running: np.arange(np.prod(shape)).ravel(shape).

Parameters:
  • shape
  • dtype
  • tile_hint
Return type:

Expr

spartan.expr.builtins.argmax(x, axis=None)[source]

Compute argmax over axis.

See numpy.ndarray.argmax.

Parameters:
  • xExpr to compute a maximum over.
  • axis – Axis (integer or None).
spartan.expr.builtins.argmin(x, axis=None)[source]

Compute argmin over axis.

See numpy.ndarray.argmin.

Parameters:
  • xExpr to compute a minimum over.
  • axis – Axis (integer or None).
spartan.expr.builtins.astype(x, dtype)[source]

Convert x to a new dtype.

See numpy.ndarray.astype.

Parameters:
  • xExpr or DistArray
  • dtype
spartan.expr.builtins.count_nonzero(array, axis=None)[source]

Return the number of nonzero values in the axis of the array.

Parameters:
  • array – DistArray or Expr.
  • axis – the axis to count
Return type:

np.int64

spartan.expr.builtins.count_zero(array, axis=None)[source]

Return the number of zero values in the axis of the array.

Parameters:
  • array – DistArray or Expr.
  • axis – the axis to count
Return type:

np.int64

spartan.expr.builtins.diag(array)[source]

Create a diagonal array with the given data on the diagonal the shape should be array.shape[0] * array.shape[0]

Parameters:array – the given data which need to be filled on the diagonal
spartan.expr.builtins.exp(v)[source]
spartan.expr.builtins.ln(v)[source]
spartan.expr.builtins.log(v)[source]
spartan.expr.builtins.mean(x, axis=None)[source]

Compute the mean of x over axis.

See numpy.ndarray.mean.

Parameters:
  • xExpr
  • axis – integer or None
spartan.expr.builtins.multiply(a, b)[source]
spartan.expr.builtins.norm_cdf(v)[source]
spartan.expr.builtins.normalize(array, axis=None)[source]

Normalize the values of array over axis. After normalization sum(array, axis) will be equal to 1.

Parameters:
  • array (Expr) – array need to be normalized
  • axis (int) – Either an integer or None.
Returns:

`Expr` – Normalized array.

spartan.expr.builtins.ones(shape, dtype=<type 'float'>, tile_hint=None)[source]

Create a distributed array over the given shape and dtype, filled with ones.

Parameters:
  • shape
  • dtype
  • tile_hint
Return type:

Expr

spartan.expr.builtins.rand(*shape, **kw)[source]

Return a random array sampled from the uniform distribution on [0, 1).

Parameters:tile_hint – A tuple indicating the desired tile shape for this array.
spartan.expr.builtins.randn(*shape, **kw)[source]

Return a random array sampled from the standard normal distribution.

Parameters:tile_hint – A tuple indicating the desired tile shape for this array.
spartan.expr.builtins.ravel(v)[source]

“Ravel” v to a one-dimensional array of shape (size(v),).

See numpy.ndarray.ravel. :param v: Expr or DistArray

spartan.expr.builtins.scan(array, reduce_fn=None, scan_fn=None, accum_fn=None, axis=None)[source]

Scan array over axis.

Parameters:
  • array – The array to scan.
  • reduce_fn – local reduce function
  • scan_fn – scan function
  • accum_fn – accumulate function
  • axis – Either an integer or None.
spartan.expr.builtins.size(x, axis=None)[source]

Return the size (product of the size of all axes) of x.

See numpy.ndarray.size.

Parameters:xExpr to compute the size of.
spartan.expr.builtins.sparse_diagonal(shape, dtype=<type 'numpy.float32'>, tile_hint=None)[source]
spartan.expr.builtins.sparse_empty(shape, dtype=<type 'numpy.float32'>, tile_hint=None)[source]

Return an empty sparse array of the given shape.

Parameters:
  • shapetuple. Shape of the resulting array.
  • dtypenp.dtype
  • tile_hint – A tuple indicating the desired tile shape for this array.
spartan.expr.builtins.sparse_rand(shape, density=0.001, format='lil', dtype=<type 'numpy.float32'>, tile_hint=None)[source]

Make a distributed sparse random array.

Random values are chosen from the uniform distribution on [0, 1).

Parameters:
  • density (float) – Fraction of values to be filled
  • format (string) – Sparse tile format (lil, coo, csr, csc).
  • dtype (np.dtype) – Datatype of array.
  • tile_hint (tuple or None) – Shape of array tiles.
Returns:

Expr

spartan.expr.builtins.sqrt(v)[source]
spartan.expr.builtins.square(v)[source]
spartan.expr.builtins.sub(a, b)[source]
spartan.expr.builtins.sum(x, axis=None)[source]

Sum x over axis.

Parameters:
  • x – The array to sum.
  • axis – Either an integer or None.
spartan.expr.builtins.tocoo(array)[source]

Convert array to use COO (coordinate) format for tiles.

Parameters:array – Sparse Expr.
Return type:A new array in COO format.
spartan.expr.builtins.zeros(shape, dtype=<type 'float'>, tile_hint=None)[source]

Create a distributed array over the given shape and dtype, filled with zeros.

Parameters:
  • shape
  • dtype
  • tile_hint
Return type:

Expr

checkpoint Module

class spartan.expr.checkpoint.CheckpointExpr(*args, **kw)[source]

Bases: spartan.expr.base.Expr

load_data(cached_result)[source]
members = ['children', 'path', 'mode', 'ready', 'expr_id', 'stack_trace']
node_items(node)
node_type = 'CheckpointExpr'
spartan.expr.checkpoint.checkpoint(x, mode='disk')[source]

Make a checkpoint for x

Parameters:
  • xnumpy.ndarray or Expr
  • mode – ‘disk’ or ‘replica’
Return type:

Expr

dot Module

Dot expr.

class spartan.expr.dot.DotExpr(*args, **kw)[source]

Bases: spartan.expr.base.Expr

members = ['matrix_a', 'matrix_b', 'tile_hint', 'expr_id', 'stack_trace']
node_items(node)
node_type = 'DotExpr'
spartan.expr.dot.dot(a, b, tile_hint=None)[source]

Compute the dot product (matrix multiplication) of 2 arrays.

Parameters:
Return type:

Expr

fio Module

File I/O for spartan

These include –

  • load/save

    The format is based on npy format, see https://github.com/numpy/numpy/blob/master/doc/neps/npy-format.txt for the detail.

    If the prefix for files is “foo”, a file, foo_distarray.spf is used to represent the distarray-wise information including array_shape, tiles. For each tile, spartan uses one format-modifed npy file to represent it, foo_$ul_$lr.spf. If the tile is a sparse tile, one additional npz file, foo_$ul_$lr.npz, is used to represent the data (as coo_matrix). All files are in the prefix directory.

    Spartan adds several extra key in dictionary field of npy to record the information of tile. If the tile is sparse, the data field contains nothing since all data is represented by the other npz file.

  • pickle/unpickle

    Just use cPickle to dump/load to/from files. These functions don’t have format compatible issues.

spartan.expr.fio.load(prefix, path='.', iszip=False)[source]

Load prefix to a new array.

This expr is lazy and return expr Returns a new array with extents/tiles from prefix

Parameters:
  • prefix – Prefix of all file names
  • path – Path to store the directory prefix
  • iszip – Zip all files
spartan.expr.fio.partial_load(extents, prefix, path='.', iszip=False)[source]

Load some tiles from prefix to some workers.

This expr is not lazy and return tile_id(s).

Parameters:
  • extents – A dictionary which contains extents->workers
  • prefix – Prefix of all file names
  • path – Path to store the directory prefix
  • iszip – Zip all files
Return type:

A dictionary which contains extents->tile_id

spartan.expr.fio.partial_unpickle(extents, prefix, path='.', iszip=False)[source]

Unpickle some tiles from prefix to some workers.

This expr is not lazy and return tile_id(s).

Parameters:
  • extents – A dictionary which contains extents->workers
  • prefix – Prefix of all file names
  • path – Path to store the directory prefix
  • iszip – Zip all files
Return type:

A dictionary which contains extents->tile_ids

spartan.expr.fio.pickle(array, prefix, path='.', iszip=False)[source]

Save array to prefix_xxx. Use cPickle.

This expr is not lazy and return True if success. Returns the number of saved files.

Parameters:
  • array – Expr or distarray
  • path – Path to store the directory prefix
  • prefix – Prefix of all file names
  • iszip – Zip all files
spartan.expr.fio.save(array, prefix, path='.', iszip=False)[source]

Save array to prefix_xxx.

This expr is not lazy and return True if success. Returns number of saved files (not including _dist.spf)

Parameters:
  • path – Path to store the directory prefix
  • array – Expr or distarray
  • prefix – Prefix of all file names
  • iszip – Zip files or not
spartan.expr.fio.save_filename(**kw)[source]

Return the path to write a save file to, based on the input parameters.

spartan.expr.fio.unpickle(prefix, path='.', iszip=False)[source]

Load prefix_xxx to a new array. Use cPickle.

This expr is lazy and return expr Returns a new array with extents/tiles from fn

Parameters:
  • prefix – Prefix of all file names
  • path – Path to store the directory prefix
  • iszip – Zip all files

index Module

Indeaxing expressions (slicing and filtering).

These are generated by a __getitem__ call to an Expr (e.g. x[0:10, 20:30] returns an IndexExpr).

Theoretically we could determine at whether something is a slice (indexing by a tuple) versus a boolean/index array at the call site, but for the moment these are both managed by the IndexExpr.

class spartan.expr.index.IndexExpr(*args, **kw)[source]

Bases: spartan.expr.base.Expr

Represents an indexing operation.

src

Expr to index into

idx

tuple (for slicing) or Expr (for bool/integer indexing)

compute_shape()[source]
label()[source]
members = ['src', 'idx', 'expr_id', 'stack_trace']
node_init()[source]
node_items(node)
node_type = 'IndexExpr'
spartan.expr.index.eval_index(ctx, src, idx)[source]

Index an array by another array (boolean or integer).

Parameters:
  • ctxBlobCtx
  • srcDistArray to read from
  • idxDistArray of bool or integer index.
Returns:

DistArray – The result of src[idx]

spartan.expr.index.eval_slice(ctx, src, idx)[source]

Index an array by a slice.

Parameters:
  • ctxBlobCtx
  • srcDistArray to read from
  • idx – int or tuple
Returns:

Slice – The result of src[idx]

local Module

Local expressions.

Briefly: global expressions are over arrays, and local expressions are over tiles.

`LocalExpr`s have dependencies and can be chained together; this allows us to construct local DAG’s when optimizing, which can then be executed or converted to parakeet code.

exception spartan.expr.local.CodegenException[source]

Bases: exceptions.Exception

class spartan.expr.local.FnCallExpr(*args, **kw)[source]

Bases: spartan.expr.local.LocalExpr

Evaluate a function call.

Dependencies that are variable should be specified via the deps attribute, and will be evaluated and supplied to the function when called.

Constants (axis of a reduction, datatype, etc), can be supplied via the kw argument.

evaluate(ctx)[source]
fn_name()[source]
members = ['kw', 'fn', 'pretty_fn', 'deps']
node_init()[source]
node_items(node)
node_type = 'FnCallExpr'
class spartan.expr.local.LocalCtx(*args, **kw)[source]

Bases: object

members = ['inputs']
node_items(node)
node_type = 'LocalCtx'
class spartan.expr.local.LocalExpr(*args, **kw)[source]

Bases: object

Represents an internal operation to be performed in the context of a tile.

add_dep(v)[source]
input_names()[source]
members = ['deps']
node_init()[source]
node_items(node)
node_type = 'LocalExpr'
class spartan.expr.local.LocalInput(*args, **kw)[source]

Bases: spartan.expr.local.LocalExpr

An externally supplied input.

evaluate(ctx)[source]
input_names()[source]
members = ['idx', 'deps']
node_init()[source]
node_items(node)
node_type = 'LocalInput'
class spartan.expr.local.LocalMapExpr(*args, **kw)[source]

Bases: spartan.expr.local.FnCallExpr

members = ['kw', 'fn', 'pretty_fn', 'deps']
node_items(node)
node_type = 'LocalMapExpr'
class spartan.expr.local.LocalReduceExpr(*args, **kw)[source]

Bases: spartan.expr.local.FnCallExpr

members = ['kw', 'fn', 'pretty_fn', 'deps']
node_items(node)
node_type = 'LocalReduceExpr'
class spartan.expr.local.ParakeetExpr(*args, **kw)[source]

Bases: spartan.expr.local.LocalExpr

evaluate(ctx)[source]
members = ['deps', 'source']
node_items(node)
node_type = 'ParakeetExpr'
spartan.expr.local.make_var()[source]

Return a new unique key for use as a variable name

map Module

Implementation of the map operation.

Maps encompass most of the common Numpy arithmetic operators and element-wise operations. For instance a + b is translated to:

map((a, b), lambda x, y: x + y)

Inputs to a map are broadcast up to have the same shape (this is the same behavior as Numpy broadcasting).

class spartan.expr.map.MapExpr(*args, **kw)[source]

Bases: spartan.expr.base.Expr

Represents mapping an operator over one or more inputs.

Variables:
  • op – A LocalExpr to evaluate on the input(s)
  • children – One or more Expr to map over.
compute_shape()[source]

MapTiles retains the shape of inputs.

Broadcasting results in a map taking the shape of the largest input.

label()[source]
members = ['children', 'op', 'expr_id', 'stack_trace']
node_items(node)
node_type = 'MapExpr'
spartan.expr.map.map(inputs, fn, numpy_expr=None, fn_kw=None)[source]

Evaluate fn over each tile of the input.

Parameters:
  • inputs (list) – List of Expr‘s to map over
  • fn (function) – Mapper function. Should take a Numpy array as an input and return a new Numpy array.
  • fn_kw (dict) – Optional. Keyword arguments to pass to fn.
Returns:

MapExpr – An expression node representing mapping fn over inputs.

spartan.expr.map.tile_mapper(ex, children, op)[source]

Run for each tile of a Map operation.

Evaluate the map function on the local tile and return a result.

Parameters:
  • exExtent
  • children – Input arrays for this operation.
  • opLocalExpr to evaluate.

ndarray Module

class spartan.expr.ndarray.NdArrayExpr(*args, **kw)[source]

Bases: spartan.expr.base.Expr

compute_shape()[source]
dependencies()[source]
label()[source]
members = ['_shape', 'sparse', 'dtype', 'tile_hint', 'reduce_fn', 'expr_id', 'stack_trace']
node_items(node)
node_type = 'NdArrayExpr'
visit(visitor)[source]
spartan.expr.ndarray.ndarray(shape, dtype=<type 'float'>, tile_hint=None, reduce_fn=None, sparse=False)[source]

Lazily create a new distributed array. :param shape: :param dtype: :param tile_hint:

optimize Module

Optimizations over an expression graph.

class spartan.expr.optimize.CollapsedCachedExpressions[source]

Bases: spartan.expr.optimize.OptimizePass

Replace expressions which have already been evaluated with a simple value expression.

This results in simpler local expressions when evaluating iterative programs.

before = [<class 'spartan.expr.optimize.MapMapFusion'>, <class 'spartan.expr.optimize.ReduceMapFusion'>]
name = 'collapse_cached'
visit_default(expr)[source]
class spartan.expr.optimize.MapMapFusion[source]

Bases: spartan.expr.optimize.OptimizePass

Fold sequences of Map operations together.

map(f, map(g, map(h, x))) -> map(f . g . h, x)

name = 'map_fusion'
visit_MapExpr(expr)[source]
class spartan.expr.optimize.OptimizePass[source]

Bases: object

after = []
before = []
visit(op)[source]
class spartan.expr.optimize.ParakeetGeneration[source]

Bases: spartan.expr.optimize.OptimizePass

Replace local map/reduce operations with an equivalent parakeet function definition.

after = [<class 'spartan.expr.optimize.MapMapFusion'>, <class 'spartan.expr.optimize.ReduceMapFusion'>]
name = 'parakeet_gen'
visit_MapExpr(expr)[source]
class spartan.expr.optimize.ReduceMapFusion[source]

Bases: spartan.expr.optimize.OptimizePass

Fuse reduce(f, map(g, X)) -> reduce(f . g, X)

after = [<class 'spartan.expr.optimize.MapMapFusion'>]
name = 'reduce_fusion'
visit_ReduceExpr(expr)[source]
spartan.expr.optimize.add_optimization(klass, default)[source]
spartan.expr.optimize.apply_pass(klass, dag)[source]
spartan.expr.optimize.codegen(op)[source]

Given a local operation, generate an equivalent parakeet function definition.

spartan.expr.optimize.disable_parakeet(fn)[source]

Disables parakeet optimization for this function.

spartan.expr.optimize.find_modules(op)[source]
spartan.expr.optimize.fusable(v)[source]
spartan.expr.optimize.merge_var(children, k, v)[source]

Add a new expression with key k to the children dictionary.

If k is already in the dictionary, than v must be equal to the current value.

spartan.expr.optimize.not_idempotent(fn)[source]

Disable map fusion for fn.

spartan.expr.optimize.optimize(dag)[source]

outer Module

class spartan.expr.outer.OuterProductExpr(*args, **kw)[source]

Bases: spartan.expr.base.Expr

members = ['children', 'map_fn', 'map_fn_kw', 'reduce_fn', 'reduce_fn_kw', 'expr_id', 'stack_trace']
node_items(node)
node_type = 'OuterProductExpr'
spartan.expr.outer.outer(a, b)[source]
spartan.expr.outer.outer_product(a, b, map_fn, reduce_fn)[source]

Outer (cartesian) product over the tiles of a and b.

map_fn is applied to each pair; reduce_fn is used to combine overlapping outputs.

Parameters:
  • a
  • b

reduce Module

Implementation of the reduction expression.

This supports generic reduce operations such as sum, argmin, argmax, min and max.

class spartan.expr.reduce.ReduceExpr(*args, **kw)[source]

Bases: spartan.expr.base.Expr

compute_shape()[source]
label()[source]
members = ['children', 'axis', 'dtype_fn', 'op', 'accumulate_fn', 'expr_id', 'stack_trace']
node_init()[source]
node_items(node)
node_type = 'ReduceExpr'
spartan.expr.reduce.reduce(v, axis, dtype_fn, local_reduce_fn, accumulate_fn, fn_kw=None)[source]

Reduce v over axis axis.

The resulting array should have a datatype given by dtype_fn(input).

For each tile of the input local_reduce_fn is called with arguments: (tiledata, axis, extent).

The output is combined using accumulate_fn.

Parameters:
  • vExpr
  • axis – int or None
  • dtype_fn – Callable: fn(array) -> numpy.dtype
  • local_reduce_fn – Callable: fn(extent, data, axis)
  • accumulate_fn – Callable: fn(old_v, update_v) -> new_v
Return type:

Expr

reshape Module

Reshape operation and expr.

class spartan.expr.reshape.Reshape(base, shape, tile_hint=None)[source]

Bases: spartan.array.distarray.DistArray

Reshape the underlying array base.

Reshape does not create a copy of the base array. Instead the fetch method is overridden: 1. Caculate the underlying extent containing the requested extent. 2. Fetch the underlying extent. 3. Trim the fetched tile and reshape to the requested tile.

To support foreach_tile() and tile_shape() (used by dot), Reshape needs an blob_id-to-extent map and extents shape. Therefore, Reshape creates a distarray (shape_array), but Reshape doesn’t initialize its content.

fetch(ex)[source]
foreach_tile(mapper_fn, kw=None)[source]
tile_shape()[source]
class spartan.expr.reshape.ReshapeExpr(*args, **kw)[source]

Bases: spartan.expr.base.Expr

members = ['array', 'new_shape', 'tile_hint', 'expr_id', 'stack_trace']
node_items(node)
node_type = 'ReshapeExpr'
spartan.expr.reshape.reshape(array, new_shape, tile_hint=None)[source]

Reshape/retile array.

Parameters:
  • arrayExpr to reshape.
  • new_shape (tuple) – Target shape.
  • tile_hint (tuple)
Returns:

`ReshapeExpr` – Reshaped array.

shuffle Module

class spartan.expr.shuffle.ShuffleExpr(*args, **kw)[source]

Bases: spartan.expr.base.Expr

members = ['array', 'map_fn', 'target', 'tile_hint', 'fn_kw', 'expr_id', 'stack_trace']
node_items(node)
node_type = 'ShuffleExpr'
spartan.expr.shuffle.notarget_mapper(ex, array=None, map_fn=None, source=None, fn_kw=None)[source]

Kernel function invoked during shuffle.

Runs map_fn over a single tile of the source array.

Parameters:
  • ex (Extent) – Extent being processed.
  • map_fn (function) – Function passed into shuffle.
  • source (DistArray) – DistArray being mapped over.
  • fn_kw (dict) – Keyword arguments for map_fn.
Returns:

LocalKernelResult – List of (new_extent, new_tile_id).

spartan.expr.shuffle.shuffle(v, fn, tile_hint=None, target=None, kw=None)[source]

Evaluate fn over each extent of v.

Parameters:
  • v (Expr or DistArray) – Source array to map over.
  • fn (function) – Function from (DistArray, extent, **kw) to list of (new_extent, new_data)
  • target (Expr) – Optional. If specified, the output of fn will be written into target.
  • kw (dict) – Optional. Keyword arguments to pass to fn.
Returns:

ShuffleExpr

spartan.expr.shuffle.target_mapper(ex, map_fn=None, source=None, target=None, fn_kw=None)[source]

Kernel function invoked during shuffle.

Runs map_fn over a single tile of the source array.

Parameters:
  • ex (Extent) – Extent being processed.
  • map_fn (function) – Function passed into shuffle.
  • source (DistArray) – DistArray being mapped over.
  • target (DistArray) – Array being written to.
  • fn_kw (dict) – Keyword arguments for map_fn.
Returns:

LocalKernelResult – No result data (all output is written to target).

stencil Module

spartan.expr.stencil.jit(fn)[source]
spartan.expr.stencil.maxpool(images, pool_size=2, stride=2)[source]
spartan.expr.stencil.stencil(images, filters, stride=1)[source]
spartan.expr.stencil.stencil_mapper(array, ex, filters=None, images=None, target_shape=None)[source]
spartan.expr.stencil.tiles_like(array, target_shape)[source]

transpose Module

Transpose operation and expr.

class spartan.expr.transpose.Transpose(base)[source]

Bases: spartan.array.distarray.DistArray

Transpose the underlying array base.

Transpose does not create a copy of the base array. Instead the fetch method is overridden: the dimensions for the requested extent are reversed and the “transposed” request is sent to the underlying base array.

Transpose supports tile_shape() by returing reversed base.tile_shape(). To support foreach_tile(), Transpose reports base’s tiles and reverses extents’ shape in _tile_mapper()

fetch(ex)[source]
foreach_tile(mapper_fn, kw=None)[source]
tile_shape()[source]
class spartan.expr.transpose.TransposeExpr(*args, **kw)[source]

Bases: spartan.expr.base.Expr

members = ['array', 'tile_hint', 'expr_id', 'stack_trace']
node_items(node)
node_type = 'TransposeExpr'
spartan.expr.transpose.transpose(array, tile_hint=None)[source]

Transpose array.

Parameters:arrayExpr to transpose.
Returns:`TransposeExpr` – Transpose array.

write_array Module

Operations for updating slices of arrays.

To preserve the non-mutation semantics required for optimizations to be correct, writing to an array should not actually mutate the original array, but should instead create a new array with the appropriate region updated. This code currently mutates arrays in place, and therefore should be used with care.

class spartan.expr.write_array.WriteArrayExpr(*args, **kw)[source]

Bases: spartan.expr.base.Expr

members = ['array', 'src_slices', 'data', 'dst_slices', 'expr_id', 'stack_trace']
node_items(node)
node_type = 'WriteArrayExpr'
spartan.expr.write_array.from_file(fn, file_type='numpy')[source]

Make a distarray from a file. Currently support npy/npz.

Parameters:fnfile name
Return type:Expr
spartan.expr.write_array.from_numpy(npa)[source]

Make a distarray from a numpy array

Parameters:npanumpy.ndarray
Return type:Expr
spartan.expr.write_array.write(array, src_slices, data, dst_slices)[source]

array[src_slices] = data[dst_slices]

Parameters:
  • array – Expr or distarray
  • src_slices – slices for array
  • data – data
  • dst_slices – slices for data
Return type:

Expr

rpc Package

rpc Package

spartan.rpc.connect(host, port)[source]
spartan.rpc.listen(host, port)[source]
spartan.rpc.listen_on_random_port(host)[source]

common Module

Simple RPC library.

The Client and Server classes here work with sockets which should implement the Socket interface.

class spartan.rpc.common.Client(socket)[source]

Bases: object

addr()[source]
close()[source]
handle_read(socket)[source]
send(method, request, timeout)[source]
class spartan.rpc.common.FnFuture(future, fn)[source]

Bases: object

Chain fn to the given future.

self.wait() return fn(future.wait()).

wait()[source]
class spartan.rpc.common.Future(addr, rpc_id, timeout=None)[source]

Bases: object

done(result=None)[source]
elapsed_time()[source]
on_finished(fn)[source]
timed_out()[source]
wait()[source]
class spartan.rpc.common.FutureGroup[source]

Bases: list

wait()[source]
class spartan.rpc.common.Group[source]

Bases: tuple

class spartan.rpc.common.PendingRequest(socket, rpc_id)[source]

Bases: object

An outstanding RPC request on the server.

Call done(result) when finished to send result back to client.

done(result=None)[source]
exception()[source]
wait()[source]
class spartan.rpc.common.PickledData(*args, **kw)[source]

Bases: object

Helper class: indicates that this message has already been pickled, and should be sent as is, rather than being re-pickled.

members = ['data']
node_items(node)
node_type = 'PickledData'
class spartan.rpc.common.ProxyMethod(client, method)[source]

Bases: object

class spartan.rpc.common.RPCException(*args, **kw)[source]

Bases: object

members = ['py_exc']
node_items(node)
node_type = 'RPCException'
exception spartan.rpc.common.RemoteException(tb)[source]

Bases: exceptions.Exception

Wrap a uncaught remote exception.

class spartan.rpc.common.Server(socket)[source]

Bases: object

addr[source]
handle_read(socket)[source]
register_method(name, fn)[source]
register_object(obj)[source]
serve()[source]
serve_nonblock()[source]
shutdown()[source]
timings()[source]
class spartan.rpc.common.SocketBase[source]

Bases: object

close()[source]
connect()[source]
flush()[source]
recv()[source]
register_handler(handler)[source]

A handler() is called in response to read requests.

send(blob)[source]
exception spartan.rpc.common.TimeoutException(tb)[source]

Bases: exceptions.Exception

Wrap a timeout exception.

spartan.rpc.common.capture_exception(exc_info=None)[source]
spartan.rpc.common.forall(clients, method, request, timeout=None)[source]

Invoke method with request for each client in clients

request is only serialized once, so this is more efficient when targeting multiple workers with the same data.

Returns a future wrapping all of the requests.

spartan.rpc.common.read(f)[source]
spartan.rpc.common.serialize(obj)[source]
spartan.rpc.common.serialize_to(obj, writer)[source]
spartan.rpc.common.set_default_timeout(seconds)[source]
spartan.rpc.common.wait_for_all(futures)[source]

zeromq Module

ZeroMQ socket implementation.

class spartan.rpc.zeromq.ServerSocket(ctx, sock_type, hostport)[source]

Bases: spartan.rpc.zeromq.Socket

bind()[source]
handle_read(socket)[source]
listen()[source]
send(msg)[source]

Send msg to a remote client.

Parameters:msgGroup, with the first element being the destination to send to.
zmq()[source]
class spartan.rpc.zeromq.Socket(ctx, sock_type, hostport)[source]

Bases: spartan.rpc.common.SocketBase

addr
close(*args)[source]
closed()[source]
connect()[source]
handle_close()[source]
handle_read(socket)[source]
handle_write()[source]
host[source]
port[source]
recv()[source]
send(msg)[source]
zmq()[source]
class spartan.rpc.zeromq.StubSocket(source, socket, data)[source]

Bases: spartan.rpc.common.SocketBase

Handles a single read from a client

addr[source]
recv()[source]
send(req)[source]
class spartan.rpc.zeromq.ZMQPoller[source]

Bases: threading.Thread

add(socket, direction)[source]
disable_profiling()[source]
enable_profiling()[source]
modify(socket, direction)[source]
queue_close(socket)[source]

Execute socket.handle_close() from within the polling thread.

stop()[source]
wakeup()[source]
spartan.rpc.zeromq.client_socket(addr)[source]
spartan.rpc.zeromq.in_poll_loop()[source]
spartan.rpc.zeromq.poller()[source]
spartan.rpc.zeromq.server_socket(addr)[source]
spartan.rpc.zeromq.server_socket_random_port(host)[source]
spartan.rpc.zeromq.shutdown()[source]

Indices and tables