Python API
In this tutorial, we illustrate how you can use wavemap’s Python API in your own projects.
Setup
Before you start, make sure you installed pywavemap. In case you used a virtual environment, activate it by running the following command from your terminal:
source <path_to_your_virtual_env>/bin/activate
In your python files, you can then load the API by simply calling:
import pywavemap as wave
Code examples
In the following sections, we provide sample code for common tasks. If you’d like to request examples for additional tasks or contribute new examples, please don’t hesitate to contact us.
Tip
All of the examples scripts that follow can be found here.
Mapping
The only requirements to build wavemap maps are that you have a set of
depth measurements,
sensor pose (estimates) for each measurement.
We usually use depth measurements from depth cameras or 3D LiDARs, but any source would work as long as a corresponding projection and measurement model is available. To help you get started quickly, we provide example configs for various sensor setups here. An overview of all the available settings is provided on the parameters page.
Example pipeline
import os
import csv
from PIL import Image as PilImage
import numpy as np
import pywavemap as wave
# Parameters
home_dir = os.path.expanduser('~')
measurement_dir = os.path.join(home_dir,
"data/panoptic_mapping/flat_dataset/run2")
output_map_path = os.path.join(home_dir, "your_map.wvmp")
# Create a map
your_map = wave.Map.create({
"type": "hashed_chunked_wavelet_octree",
"min_cell_width": {
"meters": 0.05
}
})
# Create a measurement integration pipeline
pipeline = wave.Pipeline(your_map)
# Add map operations
pipeline.add_operation({
"type": "threshold_map",
"once_every": {
"seconds": 5.0
}
})
# Add a measurement integrator
pipeline.add_integrator(
"my_integrator", {
"projection_model": {
"type": "pinhole_camera_projector",
"width": 640,
"height": 480,
"fx": 320.0,
"fy": 320.0,
"cx": 320.0,
"cy": 240.0
},
"measurement_model": {
"type": "continuous_ray",
"range_sigma": {
"meters": 0.01
},
"scaling_free": 0.2,
"scaling_occupied": 0.4
},
"integration_method": {
"type": "hashed_chunked_wavelet_integrator",
"min_range": {
"meters": 0.1
},
"max_range": {
"meters": 5.0
}
},
})
# Index the input data
ids = []
times = []
stamps_file = os.path.join(measurement_dir, 'timestamps.csv')
if not os.path.isfile(stamps_file):
print(f"Could not find timestamp file '{stamps_file}'.")
with open(stamps_file) as read_obj:
csv_reader = csv.reader(read_obj)
for row in csv_reader:
if row[0] == "ImageID":
continue
ids.append(str(row[0]))
times.append(float(row[1]) / 1e9)
ids = [x for _, x in sorted(zip(times, ids))]
# Integrate all the measurements
current_index = 0
while True:
# Check we're not done
if current_index >= len(ids):
break
# Load depth image
file_path_prefix = os.path.join(measurement_dir, ids[current_index])
depth_file = file_path_prefix + "_depth.tiff"
if not os.path.isfile(depth_file):
print(f"Could not find depth image file '{depth_file}'")
current_index += 1
raise SystemExit
cv_img = PilImage.open(depth_file)
image = wave.Image(np.array(cv_img).transpose())
# Load transform
pose_file = file_path_prefix + "_pose.txt"
if not os.path.isfile(pose_file):
print(f"Could not find pose file '{pose_file}'")
current_index += 1
raise SystemExit
if os.path.isfile(pose_file):
with open(pose_file) as f:
pose_data = [float(x) for x in f.read().split()]
transform = np.eye(4)
for row in range(4):
for col in range(4):
transform[row, col] = pose_data[row * 4 + col]
pose = wave.Pose(transform)
# Integrate the depth image
print(f"Integrating measurement {ids[current_index]}")
pipeline.run_pipeline(["my_integrator"], wave.PosedImage(pose, image))
current_index += 1
# Remove map nodes that are no longer needed
your_map.prune()
# Save the map
print(f"Saving map of size {your_map.memory_usage} bytes")
your_map.store(output_map_path)
# Avoids leak warnings on old Python versions with lazy garbage collectors
del pipeline, your_map
Serialization
Next, we show how you can serialize and deserialize common wavemap objects, for example to save and load them from files.
Maps
Wavemap uses a lightweight, efficient binary format to serialize its maps. The same format is used across wavemap’s C++, Python and ROS interfaces. You could therefore, for example, create maps on a robot with ROS and subsequently analyze them in Python.
Binary files
Maps can be saved to disk using
import os
import pywavemap as wave
# Create an empty map for illustration purposes
your_map = wave.Map.create({
"type": "hashed_chunked_wavelet_octree",
"min_cell_width": {
"meters": 0.1
}
})
# Save the map
user_home = os.path.expanduser('~')
map_path = os.path.join(user_home, "your_map.wvmp")
your_map.store(map_path)
and read with
import os
import pywavemap as wave
# Load the map
user_home = os.path.expanduser('~')
map_path = os.path.join(user_home, "your_map.wvmp")
your_map = wave.Map.load(map_path)
Configs
In the previous mapping pipeline example, the configuration parameters for the map and the measurement integration components were hard-coded. To make your setup more flexible, you can use configuration files. We will demonstrate how to work with YAML files, which is the format we use for wavemap’s example configs. However, pywavemap is flexible and can support any parameter format that can be read into a Python dict.
YAML files
import os
import yaml
import pywavemap as wave
def create_map_from_config(config_file_path):
"""
Example function that creates a map based on parameters in a YAML file.
"""
with open(config_file_path) as file:
try:
config = yaml.safe_load(file)
except yaml.YAMLError as exc:
print(exc)
return None
if isinstance(config, dict) and "map" in config.keys():
return wave.Map.create(config["map"])
return None
# Provide the path to your config
config_dir = os.path.abspath(
os.path.join(__file__, "../../../../interfaces/ros1/wavemap_ros/config"))
config_file = os.path.join(config_dir, "wavemap_panoptic_mapping_rgbd.yaml")
# Create the map
your_map = create_map_from_config(config_file)
Queries
In this section, we show how you can query wavemap maps and classify whether a point or region of interest is occupied.
Node indices
The map models the environment by filling it with cubes of variable sizes, arranged as the nodes of an octree. Node indices are defined as integer [X, Y, Z, height] coordinates, whose XYZ values correspond to the node’s position in the octree’s grid at the given height, or level in the tree. Height 0 corresponds to the map’s maximum resolution, and the grid resolution is halved for each subsequent height level.
Fixed resolution
Querying the value of a single node in the highest resolution grid (height=0) can be done as follows.
import numpy as np
import _dummy_objects
# Load a map
your_map = _dummy_objects.example_map()
# Declare the index to query
query_index = np.array([0, 0, 0])
# Query the map's value at the given index
occupancy_log_odds = your_map.get_cell_value(query_index)
print(occupancy_log_odds)
Multi-res averages
It is also possible to query lower resolution nodes, whose values correspond to the average estimated occupancy of the volume they cover.
import numpy as np
import pywavemap as wave
import _dummy_objects
# Load a map
your_map = _dummy_objects.example_map()
# Define the center point and the minimum width of your region of interest
query_point = np.array([0.4, 0.5, 0.6])
query_min_cell_width = 0.5 # in meters
# Compute the index of the smallest node that covers it completely
query_height = wave.convert.cell_width_to_height(query_min_cell_width,
your_map.min_cell_width)
query_index = wave.convert.point_to_node_index(query_point,
your_map.min_cell_width,
query_height)
# Query the node's average occupancy
occupancy_log_odds = your_map.get_cell_value(query_index)
print(occupancy_log_odds)
Accelerators
If you need to look up multiple node values, we recommend using our batched query functions. These functions deliver significant speedups by utilizing wavemap’s QueryAccelerator.
import numpy as np
import _dummy_objects
# Load a map
your_map = _dummy_objects.example_map()
# Vectorized query for a list of indices at the highest resolution (height 0)
indices = np.random.randint(-100, 100, size=(64 * 64 * 32, 3))
values = your_map.get_cell_values(indices)
print(values)
# Vectorized query for a list of multi-resolution indices (at random heights)
node_heights = np.random.randint(0, 6, size=(64 * 64 * 32, 1))
node_indices = np.concatenate((node_heights, indices), axis=1)
node_values = your_map.get_cell_values(node_indices)
print(node_values)
Note
So far batched queries are only implemented for HashedWaveletOctree maps. We will add support for HashedChunkedWaveletOctree maps in the near future.
Real coordinates
Many applications require occupancy estimates at arbitrary 3D points, with real-valued coordinates. Such estimates are computed by interpolating the map.
Caution
If your query points are expressed in a different coordinate frame than the map, do not forget to transform them into the map frame before you continue.
Nearest neighbor interpolation
The simplest form of interpolation simply looks up the value of the map node that is closest to the query point.
import numpy as np
from pywavemap import InterpolationMode
import _dummy_objects
# Load a map
your_map = _dummy_objects.example_map()
# Declare the point to query [in map frame]
query_point = np.array([0.4, 0.5, 0.6])
# Query a single point
occupancy_log_odds = your_map.interpolate(query_point,
InterpolationMode.NEAREST)
print(occupancy_log_odds)
# Vectorized query for a list of points
points = np.random.random(size=(64 * 64 * 32, 3))
points_log_odds = your_map.interpolate(points, InterpolationMode.NEAREST)
print(points_log_odds)
Trilinear interpolation
Another option is to linearly interpolate the map along the x, y, and z axes. This method produces cleaner, more accurate results at the cost of being slightly slower, since it needs to query 8 neighboring map nodes.
import numpy as np
from pywavemap import InterpolationMode
import _dummy_objects
# Load a map
your_map = _dummy_objects.example_map()
# Declare the point to query [in map frame]
query_point = np.array([0.4, 0.5, 0.6])
# Query a single point
occupancy_log_odds = your_map.interpolate(query_point,
InterpolationMode.TRILINEAR)
print(occupancy_log_odds)
# Vectorized query for a list of points
points = np.random.random(size=(64 * 64 * 32, 3))
points_log_odds = your_map.interpolate(points, InterpolationMode.TRILINEAR)
print(points_log_odds)
Occupancy classification
Once the estimated occupancy at a node or point has been retrieved, it can be classified as follows.
import numpy as np
import _dummy_objects
# Declare a floating point value representing the occupancy posterior in log
# odds as queried from the map in one of the previous examples
occupancy_log_odds = _dummy_objects.example_occupancy_log_odds()
# A point is considered unobserved if its occupancy posterior is equal to the
# prior. Wavemap assumes that an unobserved point is equally likely to be
# free or occupied. In other words, the prior occupancy probability is 0.5,
# which corresponds to a log odds value of 0.0. Accounting for numerical
# noise, checking whether a point is unobserved can be done as follows:
kUnobservedThreshold = 1e-3
is_unobserved = np.abs(occupancy_log_odds) < kUnobservedThreshold
print(is_unobserved)
# In case you would like to convert log odds into probabilities, we provide
# the following convenience function:
def log_odds_to_probability(log_odds):
odds = np.exp(log_odds)
prob = odds / (1.0 + odds)
return prob
occupancy_probability = log_odds_to_probability(occupancy_log_odds)
print(occupancy_probability)
# To do the opposite
def probability_to_log_odds(probability):
odds = probability / (1.0 - probability)
return np.log(odds)
occupancy_log_odds = probability_to_log_odds(occupancy_probability)
print(occupancy_log_odds)
# To classify whether a point is estimated to be occupied or free, you need
# to choose a discrimination threshold. A reasonable default threshold is 0.5
# (probability), which corresponds to 0.0 log odds.
kOccupancyThresholdProb = 0.5
kOccupancyThresholdLogOdds = 0.0
# NOTE: To tailor the threshold, we recommend running wavemap on a dataset
# that is representative of your application and analyzing the Receiver
# Operating Characteristic curve.
# Once a threshold has been chosen, you can either classify in log space
is_occupied = kOccupancyThresholdLogOdds <= occupancy_log_odds
is_free = occupancy_log_odds < kOccupancyThresholdLogOdds
print(is_occupied)
print(is_free)
# Or in probability space
is_occupied = kOccupancyThresholdProb <= occupancy_probability
is_free = occupancy_probability < kOccupancyThresholdProb
print(is_occupied)
print(is_free)