Source code for pyfar.io.io

"""
Read and write objects to disk, read and write WAV files, read SOFA files.

The functions :py:func:`read` and :py:func:`write` allow to save or load
several pyfar objects and other variables. So, e.g., workspaces in notebooks
can be stored. :py:class:`Signal <pyfar.signal.Signal>` objects can be
imported and exported as WAV files using :py:func:`read_wav` and
:py:func:`write_wav`. :py:func:`read_sofa` provides functionality to read the
data stored in a SOFA file.
"""
import scipy.io.wavfile as wavfile
import os.path
import pathlib
import warnings
import numpy as np
import sofa
import zipfile
import io


from pyfar import Signal
from pyfar import Coordinates

from . import _codec as codec
import pyfar.classes.filter as fo


[docs]def read_wav(filename): """ Import a WAV file as :py:class:`~pyfar.classes.audio.Signal` object. Parameters ---------- filename : string, Path Input file. Returns ------- signal : Signal :py:class:`~pyfar.classes.audio.Signal` object containing the audio data from the WAV file. Notes ----- * This function is based on ``scipy.io.wavfile.read``. * 24-bit data cannot be read. """ sampling_rate, data = wavfile.read(filename) signal = Signal(data.T, sampling_rate, domain='time') return signal
[docs]def write_wav(signal, filename, overwrite=True): """ Write a :py:class:`~pyfar.classes.audio.Signal` object as a WAV file to disk. Parameters ---------- signal : Signal Object to be written. filename : string, Path Output file. overwrite : bool Select wether to overwrite the WAV file, if it already exists. The default is ``True``. Notes ----- * Signals are flattened before writing to disk (e.g. a signal with ``cshape = (3, 2)`` will be written to disk as a six channel wav file). * This function is based on ``scipy.io.wavfile.write``. * The bits-per-sample and PCM/float is determined by the data-type, see documentation for ``scipy.io.wavfile.write``. """ sampling_rate = signal.sampling_rate data = signal.time # Reshape to 2D data = data.reshape(-1, data.shape[-1]) if len(signal.cshape) != 1: warnings.warn(f"Signal flattened to {data.shape[0]} channels.") # .wav file extension filename = pathlib.Path(filename).with_suffix('.wav') # Check if file exists and for overwrite if overwrite is False and os.path.isfile(filename): raise FileExistsError( "File already exists," "use overwrite option to disable error.") else: wavfile.write(filename, sampling_rate, data.T)
[docs]def read_sofa(filename): """ Import a SOFA file as :py:class:`~pyfar.classes.audio.Signal` object. Parameters ---------- filename : string, Path Input SOFA file (cf. [#]_, [#]_). Returns ------- signal : Signal :py:class:`~pyfar.classes.audio.Signal` object containing the data stored in `SOFA_Object.Data.IR`. `cshape` is equal to ``(number of measurements, number of receivers)``. source_coordinates : Coordinates Coordinates object containing the data stored in `SOFA_object.SourcePosition`. The domain, convention and unit are automatically matched. receiver_coordinates : Coordinates Coordinates object containing the data stored in `SOFA_object.RecevierPosition`. The domain, convention and unit are automatically matched. Notes ----- * This function is based on the python-sofa [#]_. * Currently, only SOFA files of `DataType` ``FIR`` are supported. References ---------- .. [#] https://www.sofaconventions.org .. [#] “AES69-2015: AES Standard for File Exchange-Spatial Acoustic Data File Format.”, 2015. .. [#] https://github.com/spatialaudio/python-sofa """ sofafile = sofa.Database.open(filename) # Check for DataType if sofafile.Data.Type == 'FIR': domain = 'time' data = np.asarray(sofafile.Data.IR) sampling_rate = sofafile.Data.SamplingRate.get_values() # Check for units if sofafile.Data.SamplingRate.Units != 'hertz': raise ValueError( "SamplingRate:Units" "{sofafile.Data.SamplingRate.Units} is not supported.") else: raise ValueError("DataType {sofafile.Data.Type} is not supported.") signal = Signal(data, sampling_rate, domain=domain) # Source s_values = sofafile.Source.Position.get_values() s_domain, s_convention, s_unit = _sofa_pos(sofafile.Source.Position.Type) source_coordinates = Coordinates( s_values[:, 0], s_values[:, 1], s_values[:, 2], domain=s_domain, convention=s_convention, unit=s_unit) # Receiver r_values = sofafile.Receiver.Position.get_values() r_domain, r_convention, r_unit = _sofa_pos(sofafile.Receiver.Position.Type) receiver_coordinates = Coordinates( r_values[:, 0], r_values[:, 1], r_values[:, 2], domain=r_domain, convention=r_convention, unit=r_unit) return signal, source_coordinates, receiver_coordinates
def _sofa_pos(pos_type): if pos_type == 'spherical': domain = 'sph' convention = 'top_elev' unit = 'deg' elif pos_type == 'cartesian': domain = 'cart' convention = 'right' unit = 'met' else: raise ValueError("Position:Type {pos_type} is not supported.") return domain, convention, unit
[docs]def read(filename): """ Read any compatible pyfar object or numpy array (.far file) from disk. Parameters ---------- filename : string, Path Input file. If no extension is provided, .far-suffix is added. Returns ------- collection: dict Contains pyfar objects like ``{ 'name1': 'obj1', 'name2': 'obj2' ... }``. Examples -------- Read signal and orientations objects stored in a .far file. >>> collection = pyfar.read('my_objs.far') >>> my_signal = collection['my_signal'] >>> my_orientations = collection['my_orientations'] """ # Check for .far file extension filename = pathlib.Path(filename).with_suffix('.far') collection = {} with open(filename, 'rb') as f: zip_buffer = io.BytesIO() zip_buffer.write(f.read()) with zipfile.ZipFile(zip_buffer) as zip_file: zip_paths = zip_file.namelist() obj_names_hints = [ path.split('/')[:2] for path in zip_paths if '/$' in path] for name, hint in obj_names_hints: if codec._is_pyfar_type(hint[1:]): obj = codec._decode_object_json_aided(name, hint, zip_file) elif hint == '$ndarray': obj = codec._decode_ndarray(f'{name}/{hint}', zip_file) else: raise TypeError( '.far-file contains unknown types.' 'This might occur when writing and reading files with' 'different versions of Pyfar.') collection[name] = obj return collection
[docs]def write(filename, compress=False, **objs): """ Write any compatible pyfar object or numpy array as .far file to disk. Parameters ---------- filename : string Full path or filename. If now extension is provided, .far-suffix will be add to filename. compress : bool Default is ``False`` (uncompressed). Compressed files take less disk space but need more time for writing and reading. **objs: Objects to be saved as key-value arguments, e.g., ``name1=object1, name2=object2``. Examples -------- Save Signal object, Orientations objects and numpy array to disk. >>> s = pyfar.Signal([1, 2, 3], 44100) >>> o = pyfar.Orientations.from_view_up([1, 0, 0], [0, 1, 0]) >>> a = np.array([1,2,3]) >>> pyfar.io.write('my_objs.far', signal=s, orientations=o, array=a) """ # Check for .far file extension filename = pathlib.Path(filename).with_suffix('.far') compression = zipfile.ZIP_STORED if compress else zipfile.ZIP_DEFLATED zip_buffer = io.BytesIO() with zipfile.ZipFile(zip_buffer, "a", compression) as zip_file: for name, obj in objs.items(): if codec._is_pyfar_type(obj): codec._encode_object_json_aided(obj, name, zip_file) elif codec._is_numpy_type(obj): codec._encode({f'${type(obj).__name__}': obj}, name, zip_file) else: error = ( f'Objects of type {type(obj)} cannot be written to disk.') if isinstance(obj, fo.Filter): error = f'{error}. Consider casting to {fo.Filter}' raise TypeError(error) with open(filename, 'wb') as f: f.write(zip_buffer.getvalue())