Source code for pdata.analysis.dataview

'''
Class for post-processing measurement data.
'''

from pdata._metadata import __version__ # noqa: F401

import os
import io
import numpy as np
import re
import logging
import gzip
import tarfile
import itertools
import uuid
import json
import jsondiff
import datetime
import pytz
import jinja2
from collections import OrderedDict
from pdata.analysis.heatmap import interp1d

FAST_PARSER_ENABLED = True
try: from pdata.analysis.fast_parser import tabular_data_parser
except ImportError: FAST_PARSER_ENABLED = False

UNIX_EPOCH = datetime.datetime(1970, 1, 1, 0, 0, tzinfo = pytz.utc)

column_name_regex = r"[\w\d\s\-+%=/*&]+"
column_unit_regex = r"[\w\d\s\-+%=/*&]*"

html_template_env = jinja2.Environment(
  loader=jinja2.PackageLoader('pdata', package_path='static'),
  autoescape=lambda fname: fname.lower().endswith(".html"),
  trim_blocks=True, keep_trailing_newline=True, auto_reload=False)


[docs] class PDataSingle(): ''' Class for reading in the contents of a single pdata data directory. Almost always passed on to DataView for actual analysis. ''' def __init__(self, path, convert_timestamps=True, parse_comments=False): '''Parse data stored in the specified directory path. convert_timestamps --> Convert values that look like time stamps into seconds since Unix epoch. parse_comments --> Parse comments placed between data rows. In the current implementation, parsing the comments requires a separate pass through the data. ''' self._path = path def parse_initial_snapshot(): self._snapshots = [] if os.path.exists(os.path.join(path, 'snapshot.json')): with open(os.path.join(path, 'snapshot.json'), 'r') as f: self._snapshots.append((0, json.load(f))) else: with gzip.open(os.path.join(path, 'snapshot.json.gz'), 'rt') as f: self._snapshots.append((0, json.load(f))) def add_snapshot_diff(row, f): # Deep copy the last snapshot -> VERY inefficient but easy & safe snap = json.loads(json.dumps(self._snapshots[-1][-1])) # Add the new copy with the changes self._snapshots.append((row, jsondiff.patch(snap, json.load(f), marshal=True))) def parse_snapshot_diff_names(fnames): """ Given a list of filenames, filter and sort the snapshot diffs. """ diff_names = [] for f in fnames: m = re.match(r'snapshot\.row-(\d+)\.diff(\d+)\.json', f) if m is not None: diff_names.append((int(m.group(1)), int(m.group(2)), m.group(0))) continue diff_names.sort(key=lambda x: x[1]) # secondary sort on .diff<n> diff_names.sort(key=lambda x: x[0]) # primary sort on .row-<n> return diff_names def parse_tabular_data(f): # First extract the first data row and the header rows preceding it. header = PDataSingle._extract_header(f, parse_all_comments=parse_comments) self._comments = header["comments"] #print("\n" + header["table_header"]) #if "first_data_row" in header.keys(): print(header["first_data_row"]) #time.sleep(0.1) # Now parse the stored header if "table_header" not in header.keys(): logging.warning(f"No header found in tabular data of {self._path}") self._column_names, self._units, dtypes = [], [], [] else: self._column_names, self._units = PDataSingle._parse_columns_from_header(header["table_header"]) dtypes, converters = PDataSingle._parse_dtypes_from_header(header["table_header"], convert_timestamps=convert_timestamps) self._column_name_to_index = dict( (n, i) for i,n in enumerate(self._column_names) ) if "first_data_row" in header.keys(): # Analyze first data row inferred_dtypes, inferred_converters = PDataSingle._infer_dtypes_from_first_data_row( header["first_data_row"], convert_timestamps=(dtypes is None and convert_timestamps)) else: inferred_dtypes, inferred_converters = dict( (i, float) for i in range(len(self._column_names)) ), {} assert len(self._column_names) == len(inferred_dtypes.keys()), "The number of columns in the header and first data row do not match." if dtypes is None: dtypes = inferred_dtypes converters = inferred_converters assert len(self._column_names) == len(dtypes.keys()), "The number of columns in the header and number of parsed dtypes do not match." dtypes = list( dtypes[i] for i in range(len(dtypes.keys())) ) if "first_data_row" in header.keys(): # Parse the footer as well, if any f.seek(0, io.SEEK_END) self._footer = PDataSingle._parse_footer(PDataSingle._extract_footer(f)) #print("\n\n"); print(footer); time.sleep(0.1) # Parse the actual numerical data. # # Use "col{i}" as names, rather than self._column_names, # since pdata column names may contain characters not # allowed in numpy structured arrays. f.seek(0) if FAST_PARSER_ENABLED and len(converters.keys())==0 and all(dt in [ float, np.float64, np.float32, np.float16, int, np.int64, np.int32, np.int16, np.int8, np.intc, complex, np.complex128, np.complex64, np.cdouble, str ] for dt in dtypes): try: chunk_size = max(2, self._footer["number_of_data_rows"]) except KeyError: chunk_size = 10000 self._data = tabular_data_parser.parse_tabular_data(f.read(), dtypes, chunk_size=chunk_size) assert len(self._data.keys()) == len(self._column_names), 'Unexcepted number of data columns: %s vs %s' % (len(self._data.keys()), len(self._column_names)) else: # In numpy arrays, arbitrary length strings should have type object instead. # Also decode the byte strings. for j in range(len(dtypes)): if dtypes[j] is str and j not in converters.keys(): dtypes[j] = object converters[j] = lambda x: x.decode('utf-8') self._data = np.genfromtxt(f, delimiter="\t", comments="#", converters=dict( (f"col{i}", c) for i,c in converters.items() ), dtype=dtypes, names=list(f"col{i}" for i in range(len(self._column_names))) ) # If the data contains just a single row, genfromtxt returns a 0D array! Fortunately reshaping still works. # Note: In Numpy >= 1.23.0, setting ndmin for genfromtxt might also solve this but that remains untested. try: len(self._data) except TypeError: self._data = self._data.reshape((-1,)) assert len(self._data[0]) == len(self._column_names), 'Unexcepted number of data columns: %s vs %s' % (len(self._data[0]), len(self._column_names)) self._structured_data = self._data else: logging.warning(f"No data rows in tabular_data of {self._path}") self._data = np.array([], dtype=np.dtype(list( (f"col{i}", dt) for i,dt in enumerate(dtypes) ))) self._footer = {} #print("\n" + repr(self._data)); time.sleep(0.1) ########################################################### # Actually parse the data using the helper functions above ########################################################### # Parse main data file (possibly compressed) if os.path.exists(os.path.join(path, "tabular_data.dat")): with open(os.path.join(path, "tabular_data.dat"), 'rb') as f: parse_tabular_data(f) elif os.path.exists(os.path.join(path, "tabular_data.dat.gz")): with gzip.open(os.path.join(path, "tabular_data.dat.gz"), 'rb') as f: # Read entire file into memory. In the current implementation, # this gives a non-negligible speed benefit, due to use of f.seek() buffered_data = io.BytesIO(f.read()) parse_tabular_data(buffered_data) del buffered_data # Make sure to release the memory as soon as we're done else: other_dat_files = [ pp for pp in os.scandir(path) if pp.name.endswith(".dat") ] if len(other_dat_files) == 0: assert False, f'No .dat file found in {os.path.abspath(path)}' logging.info(f"No tabular_data.dat(.gz) found in {path}. Using {other_dat_files[0].name} instead.") with open(other_dat_files[0].path, 'rb') as f: parse_tabular_data(f) # Parse initial snapshot parse_initial_snapshot() # Parse snapshot diffs tar_fname = os.path.join(path, 'snapshot_diffs.tar.gz') if os.path.exists(tar_fname): with tarfile.open(tar_fname) as tar: for row,j,fname in parse_snapshot_diff_names(tar.getnames()): add_snapshot_diff(row, tar.extractfile(fname)) else: # uncompressed snapshot diffs as separate files for row,j,fname in parse_snapshot_diff_names(os.listdir(path)): with open(os.path.join(path, fname)) as f: add_snapshot_diff(row, f) if "snapshot_diffs_preceding_rows" in self._footer.keys(): # Check that snapshot diff rows parsed from file names match the info in the footer assert all( i==j for i,j in zip(self._footer["snapshot_diffs_preceding_rows"], [ r for r,s in self._snapshots[1:] ] ) ), "Snapshot diff rows parsed from file names don't match rows listed in tabular data footer." def name(self): return os.path.split(self._path)[-1] def filename(self): return self._path def dimension_names(self): return self._column_names def dimension_units(self): return self._units def npoints(self): try: return len(self._data["col0"]) except ValueError: return 0 # No columns in data set
[docs] def data(self): """ DEPRECATED: Return data as a structured numpy array, with column names col0, col1, etc. """ logging.warning("PDataSingle.data() is deprecated and will be removed in a future version. Instead, access the data by column usin data_object[<column name>]") if hasattr(self, "_structured_data"): return self._structured_data n = len(self.dimension_names()) src = np.transpose([ self._data[f"col{i}"] for i in range(n) ]) self._structured_data = np.array([ tuple(row) for row in src ], # <-- This is probably slow but let's not worry about it since this function is deprecated dtype=[(f"col{i}", self._data[f"col{i}"].dtype) for i in range(n)]) return self._structured_data
def comments(self): return self._comments def settings(self): return self._snapshots def __getitem__(self, key): return self._data[f"col{self._column_name_to_index[key]}"] @staticmethod def _parse_timestamp(s): t = datetime.datetime.strptime(s, '%Y-%m-%d %H:%M:%S.%f') return (t.astimezone() - UNIX_EPOCH).total_seconds() @staticmethod def _extract_header(f, parse_all_comments=False): """ Extract header and first data row from tabular data file f. """ r = {} # Dict for results to return r["comments"] = [] rowno = 0 comment = "" while True: line = f.readline() if not isinstance(line, str): line = line.decode('utf-8') if len(line) == 0: break # EOF line = line.strip() if len(line) == 0: continue # empty line if line.startswith('#'): # comment line comment += line[1:].strip() + '\n' continue # Otherwise this is a data row comment = comment.strip() if len(comment) > 0: # Store comment(s) preceding this data row r["comments"].append((rowno, comment)) # The comment rows preceding the first data row contain the # table header that defines the column names. Store the # header and the first data row for later parsing. if rowno==0: r["table_header"] = comment r["first_data_row"] = line rowno += 1 comment = "" # Done parsing the header. We can stop here if not requested # to parse all comments, also after first data row. if not parse_all_comments: break # Store header even if there were zero data rows if rowno==0: # Header is in the "comment" variable at this point. But # "comment" may also contant a footer so strip everything # after "Measurement ended at". table_header = [] for line in comment.split("\n"): if line.strip().startswith("Measurement ended at "): break table_header.append(line) r["table_header"] = "\n".join(table_header) return r @staticmethod def _infer_dtypes_from_first_data_row(line, convert_timestamps): """Infer data types from the first data row (in case the information is not available in the table header). """ converters = {} dtypes = {} for i,c in enumerate(line.split('\t')): c = c.strip().lower() if c in ["true", "false"]: dtypes[i] = bool continue if convert_timestamps: # If col is a time stamp, convert it into seconds since Unix epoch. try: PDataSingle._parse_timestamp(c) converters[i] = lambda x: PDataSingle._parse_timestamp(x.decode('utf-8')) dtypes[i] = float logging.info(f'Column {i} appears to contain timestamps. Converting them to seconds since Unix epoch. (Disable by setting convert_timestamps=False.)') continue except ValueError: pass # Not a timestamp try: # Convert all numerical types to float, including # integers. This is a bit safer, in case the first row looks # like an int but other rows contain floats. float(c) dtypes[i] = float continue except ValueError: pass # Not a float, int, or similar number # Otherwise parse this column as strings dtypes[i] = str return dtypes, converters @staticmethod def _parse_dtypes_from_header(s, convert_timestamps): """Check for dtype specification in the "Column dtypes: float\tfloat\tint\t..." format. """ m = re.search(r'(?m)^\s*Column dtypes:\s*(.*?)?$', s) if m is None or len(m.groups()) != 1: return None, None dtypes = {} converters = {} for i,dt in enumerate(m.group(1).split("\t")): dt = dt.strip() if dt.startswith("numpy."): try: dtypes[i] = getattr(np, dt[len("numpy."):]) except AttributeError: logging.warning(f"Column {i} dtype = {dt} seems like a numpy data type based on prefix, " f"but numpy.{dt} doesn't exist. Falling back to string.") dtypes[i] = str elif dt.startswith("builtins."): dtypes[i] = eval(dt[len("builtins."):]) elif dt in ["datetime.datetime", "datetime"]: if convert_timestamps: logging.info(f'Column {i} contains timestamps. Converting them to seconds since Unix epoch. (Disable by setting convert_timestamps=False.)') dtypes[i] = float converters[i] = lambda x: PDataSingle._parse_timestamp(x.decode('utf-8')) else: dtypes[i] = datetime.datetime converters[i] = lambda x: dtypes[i](x.decode('utf-8')) else: if dt not in [ "None" ]: logging.warning(f"Column {i} dtype = {dt} unrecognized. Falling back to string.") dtypes[i] = str return dtypes, converters @staticmethod def _parse_columns_from_header(s): """Parse column names and units from table header. Asssume that the last non-empty header line has them in the "Column name (unit)\t" format. If not, fall back to assuming similar but simpler legacy QCoDeS format. """ # Split into lines and keep only non-empty ones column_names_and_units = [ l for l in s.split('\n') if len(l.strip())>0 ] if len(column_names_and_units) == 0: return [],[] try: cols = [] units = [] for c in column_names_and_units[-1].split('\t'): m = re.match(f'({column_name_regex})\\s+\\(({column_unit_regex})\\)', c.strip()) cols.append(m.group(1)) units.append(m.group(2)) except AttributeError: # Try assuming the legacy format used in QCoDeS (qcodes/data/gnuplot_format.py) try: # Last row contains the number of data points --> ignore if column_names_and_units[-1].strip().isdigit(): del column_names_and_units[-1] cols = [ c.strip().strip('"') for c in column_names_and_units[-1].split('\t') ] units = [ '' for i in range(len(cols))] except IndexError: logging.warning(f"Could not parse tabular data header. Header: {s}") raise return cols, units @staticmethod def _extract_footer(f, chunk_size=4096): """Return tabular data footer from file object f. The footer contains, by defition, all rows following the last data row (= last non-empty row not starting with #). Assumes that current position is already at the end of the file. If there are zero data rows, the parsed string may also include the header. """ b = isinstance(f.read(0), bytes) # Check whether we read bytes or strings from f tail = b"" if b else "" while True: try: # Append one more chunk to tail f.seek(-min(f.tell(), (1 + (len(tail)>0))*chunk_size), os.SEEK_CUR) tail = f.read(chunk_size) + tail except IOError: # Read entire file into tail logging.debug(f"Could not read footer in chunks from end of file object {f} --> Reading entire file.") f.seek(0) tail = f.read() break # Check whether tail already contains non-comment rows. # If so, we must have read the entire footer already if any( not (l.strip().startswith(b"#" if b else "#") or len(l.strip())==0) for l in tail.split(b"\n" if b else "\n") ): break # Remove non-comment rows: footer = [] for l in reversed(tail.split(b"\n" if b else "\n")): l = l.strip() if len(l)==0: continue if b: l = l.decode("utf-8") if not l.startswith("#"): break footer.append(l[1:].strip()) return "\n".join(reversed(footer)) @staticmethod def _parse_footer(raw_footer): r = { "raw_footer": raw_footer } # Parse information from standard rows in the footer. m = re.search(r'(?m)^\s*Snapshot diffs preceding rows \(0-based index\):\s*(.*?)?$', raw_footer) if m is not None and len(m.groups()) == 1: try: r["snapshot_diffs_preceding_rows"] = np.array([ int(i) for i in m.group(1).split(",") ] if m.group(1).strip() != "" else [], dtype=int) except: # noqa: E722 logging.exception(f"Failed to parse snapshot diff row spec '{m.group(1)}' into a list of ints.") m = re.search(r'(?m)^\s*Measurement ended at\s+(.*?)?$', raw_footer) if m is not None and len(m.groups()) == 1: try: r["measurement_ended_at"] = datetime.datetime.strptime(m.group(1), '%Y-%m-%d %H:%M:%S.%f') except: # noqa: E722 logging.exception(f"Failed to parse measurement end time '{m.group(1)}' into a datetime object.") m = re.search(r'(?m)^\s*Number of data rows:\s*(\d+)$', raw_footer) if m is not None and len(m.groups()) == 1: r["number_of_data_rows"] = int(m.group(1)) return r
[docs] class DataView(): ''' Class for post-processing measurement data. Main features are: * Concatenating multiple separate data objects * Creating "virtual" columns by parsing comments or snapshot files or by applying arbitrary functions to the data * Dividing the rows into "sweeps" based on various criteria. See docs/examples/Procedural Data and DataView.ipynb for example use. ''' def __init__(self, data, deep_copy=False, source_column_name='data_source'): ''' Create a new view of existing data objects for post-processing. The original data objects will not be modified. args: data -- Data object(s). Each data object needs to provide the following methods: * name() # Arbitrary string identifier for the data object * filename() # Specifies the path to the main datafile # (for identification/debugging purpose only) * dimension_names() # List of all data column names * dimension_units() # List of all data column units * npoints() # Number of data points/rows. * data() # 2D ndarray containing all data rows and columns. * comments() # List of tuples (data_row_no, comment string), # where data_row_no indicated the index of # the data point that the comment precedes. * settings() # List of tuples (data_row_no, settings dict), # where data_row_no indicated the index of # the data point that the settings apply to. kwargs input: deep_copy -- specifies whether the underlying data is copied or only referenced (more error prone, but memory efficient) source_column_name -- specifies the name of the (virtual) column that tells which data object the row originates from. Specify None, if you don't want this column to be added. ''' self._virtual_dims = {} if isinstance(data, DataView): # clone # these private variables should be immutable so no need to deep copy self._dimensions = data._dimensions self._units = data._units self._source_col = data._source_col self._comments = data._comments self._settings = data._settings if deep_copy: self._data = data._data.copy() else: self._data = data._data # Always deep copy the mask self._mask = data._mask.copy() for name, fn in data._virtual_dims.items(): self._virtual_dims[name] = fn return def get_source_column_name(dat): return f"{dat.name()}_({dat.filename().strip('.dat')})" def is_pdatasingle_like(x): return ( hasattr(x, "dimension_names") and hasattr(x, "dimension_units") and hasattr(x, "name") and hasattr(x, "comments") ) if is_pdatasingle_like(data): # data is a single Data object self._dimensions = data.dimension_names() self._units = dict(zip(data.dimension_names(), data.dimension_units())) unmasked = dict( (dim, data[dim]) for dim in data.dimension_names() ) if source_column_name is not None: n = get_source_column_name(data) self._source_col = [n for i in range(data.npoints())] else: self._source_col = None self._comments = data.comments() try: self._settings = data.settings() except: # noqa: E722 logging.exception("Could not parse the instrument settings file. Doesn't matter if you were not planning to add virtual columns based on values in the snapshot files.") self._settings = None else: # probably data is a sequence of Data objects then assert all(is_pdatasingle_like(dd) for dd in data), "data does not seem to be a PDataSingle-like object, nor a sequence of them: " + repr(data) self._dimensions = set(itertools.chain( *(dd.dimension_names() for dd in data) )) unmasked = {} for dim in self._dimensions: unmasked[dim] = [] for dat in data: if len(dat.dimension_names()) == 0: logging.warning("%s seems to contain zero columns. Skipping it..." % (dat.filename())) continue n_rows = dat.npoints() if n_rows == 0: logging.info("%s seems to contain zero rows. Skipping it..." % (dat.filename())) continue try: unmasked[dim].append(dat[dim]) except KeyError: logging.warning(f"Dimension {dim} does not exist in data object {str(dat)}. Omitting the dimension.") del unmasked[dim] break # concatenate rows from all files if dim in unmasked.keys(): unmasked[dim] = np.concatenate(unmasked[dim]) if len(unmasked[dim])>0 else np.array([]) # add a column that specifies the source data file lens = [ dat.npoints() for dat in data ] if source_column_name is not None: names = [ get_source_column_name(dat) for dat in data ] self._source_col = [ [n for jj in range(l)] for n,l in zip(names,lens) ] #self._source_col = [ jj for jj in itertools.chain.from_iterable(self._source_col) ] # flatten self._source_col = list(itertools.chain.from_iterable(self._source_col)) # flatten else: self._source_col = None # keep only dimensions that could be parsed from all files self._dimensions = unmasked.keys() # take units from first data set self._units = dict(zip(data[0].dimension_names(), data[0].dimension_units())) # concatenate comments, adjusting row numbers from Data object rows to the corresponding dataview rows lens = np.array(lens) self._comments = [ dat.comments() for dat in data ] all_comments = [] for jj,comments in enumerate(self._comments): all_comments.append([ (rowno + lens[:jj].sum(), commentstr) for rowno,commentstr in comments ]) self._comments = list(itertools.chain.from_iterable(all_comments)) # flatten by one level # concatenate settings (snapshot) files in the same way self._settings = [ dat.settings() for dat in data ] all_settings = [] for jj,settings in enumerate(self._settings): all_settings.append([ (rowno + lens[:jj].sum(), sett) for rowno,sett in settings ]) self._settings = list(itertools.chain.from_iterable(all_settings)) # flatten by one level # Check for existence of multiple settings dicts for a single # data row. If they exist, we only care about the last one. --> Remove others. for i in range(len(self._settings)-1,0,-1): if self._settings[i][0] == self._settings[i-1][0]: del self._settings[i-1] # Initialize masks self._data = unmasked self._mask = np.zeros(0 if len(unmasked.keys())==0 else len(unmasked[list(unmasked.keys())[0]]), dtype=bool) self._mask_stack = [] self.set_mask(False) if source_column_name is not None: self.add_virtual_dimension(source_column_name, arr=np.array(self._source_col)) def __getitem__(self, index): ''' Get the values of a given dimension as a vector. ''' assert isinstance(index, str), "Data must be indexed using a dimension name. Dimensions in this Dataview: {self.dimensions()}" return self.column(index)
[docs] def copy(self, copy_data=False): ''' Make a copy of the view. The returned copy will always have an independent mask. copy_data -- whether the underlying data is also deep copied. ''' return DataView(self, deep_copy=copy_data)
[docs] def data_source(self): ''' Returns a list of strings that tell which Data object each of the unmasked rows originated from. ''' return [ i for i in itertools.compress(self._source_col, ~(self._mask)) ]
[docs] def clear_mask(self): ''' Unmask all data (i.e. make all data in the initially provided Data object visible again). ''' self._mask[:] = False self._mask_stack = []
[docs] def mask(self): ''' Get a vector of booleans indicating which rows are masked. ''' return self._mask.copy()
[docs] def dimensions(self): ''' Returns a list of all dimensions, both real and virtual. ''' return list(itertools.chain(self._data.keys(), self._virtual_dims.keys()))
[docs] def units(self, d): ''' Returns the units for dimension d ''' return self._units[d]
[docs] def comments(self): ''' Return the comments parsed from the data files. Returns tuples where the first item is an index to the first datarow that the comment applies to. ''' return self._comments
[docs] def settings(self): ''' Return the settings parsed from the settings files. Returns tuples where the first item is an index to the first datarow that the settings apply to. ''' return self._settings
[docs] def continuous_ranges(self, masked_ranges=False): ''' Returns a list of (start,stop) tuples that indicate continuous ranges of (un)masked data. ''' m = self.mask() * (-1 if masked_ranges else 1) dm = m[1:] - m[:-1] starts = 1+np.where(dm < 0)[0] stops = 1+np.where(dm > 0)[0] if not m[0]: starts = np.concatenate(( [0], starts )) if not m[-1]: stops = np.concatenate(( stops, [len(m)] )) return zip(starts, stops)
[docs] def set_mask(self, mask): ''' Set an arbitrary mask for the data. Should be a vector of booleans of the same length as the number of data points. Alternatively, simply True/False masks/unmasks all data. See also mask_rows(). ''' if mask is True: self._mask[:] = True elif mask is False: self._mask[:] = False else: m = np.zeros(len(self._mask), dtype=bool) m[mask] = True self._mask = m
[docs] def mask_rows(self, row_mask, unmask_instead = False): ''' Mask rows in the data. row_mask can be a slice or a boolean vector with length equal to the number of previously unmasked rows. The old mask is determined from the mask of the first column. Example: d = DataView(...) # ignore points where source current exceeds 1 uA. d.mask_rows(np.abs(d['I_source']) > 1e-6) ''' old_mask = self._mask n = (~old_mask).astype(int).sum() # no. of previously unmasked entries #logging.debug("previously unmasked rows = %d" % n) # new mask for the previously unmasked rows new_mask = np.empty(n, dtype=bool); new_mask[:] = unmask_instead new_mask[row_mask] = (not unmask_instead) #logging.debug("new_mask.sum() = %d" % new_mask.sum()) # combine the old and new masks full_mask = old_mask.copy() full_mask[~old_mask] = new_mask logging.debug("# of masked/unmasked rows = %d/%d" % (full_mask.astype(int).sum(), (~full_mask).astype(int).sum())) self.set_mask(full_mask)
[docs] def push_mask(self, mask, unmask_instead = False): ''' Same as mask_rows(), but also pushes the mask to a 'mask stack'. Handy for temporary masks e.g. inside loops. See also pop_mask(). ''' self._mask_stack.append(self.mask()) self.mask_rows(mask, unmask_instead = unmask_instead)
[docs] def pop_mask(self): ''' Pop the topmost mask from the mask stack, set previous mask in the stack as current one and return the popped mask. Raises an exception if trying to pop an empty stack. ''' try: previous_mask = self._mask_stack.pop() except IndexError as e: raise Exception("Trying to pop empty mask stack: %s" % e) self.set_mask(previous_mask) return previous_mask
[docs] def remove_masked_rows_permanently(self): ''' Removes the currently masked rows permanently. This is typically unnecessary, but may be useful before adding (cached) virtual columns to huge data sets where most rows are masked (because the cached virtual columns are computed for masked rows as well.) ''' # Removing the real data rows themselves is easy. for d in self._data.keys(): self._data[d] = self._data[d][~(self._mask)] # but we have to also adjust the comment & settings line numbers s = np.cumsum(self._mask.astype(int)) def n_masked_before_line(lineno): return s[max(0, min(len(s)-1, lineno-1))] self._comments = [ (max(0,lineno-n_masked_before_line(lineno)), comment) for lineno,comment in self._comments ] self._settings = [ (max(0,lineno-n_masked_before_line(lineno)), setting) for lineno,setting in self._settings ] # as well as remove the masked rows from cached virtual columns. # However, _virtual_dims is assumed to be immutable in copy() so # we must copy it here! old_dims = self._virtual_dims self._virtual_dims = {} for name, dim in old_dims.items(): cached_arr = dim['cached_array'] if isinstance(cached_arr, np.ndarray): cached_arr = cached_arr[~(self._mask)] elif cached_arr is not None: cached_arr = [ val for i,val in enumerate(cached_arr) if not self._mask[i] ] self._virtual_dims[name] = { 'fn': dim['fn'], 'cached_array': cached_arr } # finally remove the obsolete mask(s) self._mask = np.zeros(len(self._data[list(self._data.keys())[0]]), dtype=bool) self._mask_stack = []
[docs] def single_valued_parameter(self, param): ''' If all values in the (virtual) dimension "param" are the same, return that value. ''' assert len(np.unique(self[param])) == 1 or (all(np.isnan(self[param])) and len(self[param]) > 0), \ '%s is not single valued for the current unmasked rows: %s' % (param, np.unique(self[param])) return self[param][0]
def all_single_valued_parameters(self): params = OrderedDict() for p in self.dimensions(): try: params[p] = self.single_valued_parameter(p) except: pass # noqa: E722 return params
[docs] def sweeps(self, sweep_dimension, use_sweep_direction = None): '''Generator that returns shallow copies of this DataView with unmasked rows corresponding to sweeps. For more details on the arguments and how the rows are divided into sweeps, see divide_into_sweeps() ''' for s in self.divide_into_sweeps(sweep_dimension=sweep_dimension, use_sweep_direction=use_sweep_direction): dd = self.copy(); dd.mask_rows(s, unmask_instead=True) yield dd
[docs] def divide_into_sweeps(self, sweep_dimension, use_sweep_direction = None): '''Divide the rows into "sweeps" based on a monotonously increasing or decreasing value of column "sweep_dimension", if use_sweep_direction==True. If use_sweep_direction==False, sequences of points where "sweep_dimension" stays constant are considered sweeps. This is useful for splitting the data into sweeps based on a slowly varying parameter, e.g. a gate voltage set point that is changed between IV curve sweeps. If use_sweep_direction is None, this function tries to figure out which one is more reasonable. Returns a sequence of slices indicating the start and end of each sweep. Note that the indices are relative to the currently _unmasked_ rows only. ''' sdim = self[sweep_dimension] if isinstance(sdim, np.ndarray) and isinstance(sdim[0], (np.str_, np.bool_)): use_sweep_direction = False dx = sdim[1:] != sdim[:-1] elif isinstance(sdim[0], (str, bool)): # as above but native Python list use_sweep_direction = False dx = np.array([ sdim[i+1] != sdim[i] for i in range(len(sdim)-1) ]) else: # The usual case dx = np.sign(sdim[1:] - sdim[:-1]) if use_sweep_direction is None: use_sweep_direction = ( np.abs(dx).astype(int).sum() > len(dx)/4. ) if use_sweep_direction: logging.info("Assuming '%s' is swept." % sweep_dimension) else: logging.info("Assuming '%s' stays constant within a sweep." % sweep_dimension) if use_sweep_direction: for i in range(1,len(dx)): if i+1 < len(dx) and dx[i] == 0: dx[i]=dx[i+1] # this is necessary to detect changes in direction, when the end point is repeated change_in_sign = (2 + np.array(np.where(dx[1:] * dx[:-1] < 0),dtype=int).reshape((-1))).tolist() # the direction changing twice in a row means that sweeps are being done repeatedly # in the same direction. for i in range(len(change_in_sign)-1, 0, -1): if change_in_sign[i]-change_in_sign[i-1] == 1: del change_in_sign[i] if len(change_in_sign) == 0: return [ slice(0, len(sdim)) ] start_indices = np.concatenate(([0], change_in_sign)) stop_indices = np.concatenate((change_in_sign, [len(sdim)])) sweeps = np.concatenate((start_indices, stop_indices)).reshape((2,-1)).T else: change_in_sdim = 1 + np.array(np.where(dx != 0)).reshape((-1)) if len(change_in_sdim) == 0: return [ slice(0, len(sdim)) ] start_indices = np.concatenate(([0], change_in_sdim)) stop_indices = np.concatenate((change_in_sdim, [len(sdim)])) sweeps = np.concatenate((start_indices, stop_indices)).reshape((2,-1)).T return [ slice(max(s, 0), min(e, len(sdim))) for s,e in sweeps ]
[docs] def mask_sweeps(self, sweep_dimension, sl, unmask_instead=False): ''' Mask entire sweeps (see divide_into_sweeps()). sl can be a single integer or any slice object compatible with a 1D numpy.ndarray (list of sweeps). unmask_instead -- unmask the specified sweeps instead, mask everything else ''' sweeps = self.divide_into_sweeps(sweep_dimension) row_mask = np.zeros(len(self[sweep_dimension]), dtype=bool) for start,stop in ([sweeps[sl]] if isinstance(sl, int) else sweeps[sl]): logging.debug("%smasking start: %d, stop %d" % ('un' if unmask_instead else '',start, stop)) row_mask[start:stop] = True self.mask_rows(~row_mask if unmask_instead else row_mask)
[docs] def unmask_sweeps(self, sweep_dimension, sl): ''' Mask all rows except the specified sweeps (see divide_into_sweeps()). sl can be a single integer or any slice object compatible with a 1D numpy.ndarray (list of sweeps). ''' self.mask_sweeps(sweep_dimension, sl, unmask_instead=True)
[docs] def column(self, name, deep_copy=False): ''' Get the non-masked entries of dimension 'name' as a 1D ndarray. name is the dimension name. kwargs: deep_copy -- copy the returned data so that it is safe to modify it. ''' if name in self._virtual_dims.keys(): d = self._virtual_dims[name]['cached_array'] if d is None: d = self._virtual_dims[name]['fn'](self) if len(d) == len(self._mask): # The function may return masked or unmasked data... # The function returned unmasked data so apply the mask try: d = d[~(self._mask)] # This works for ndarrays except: # noqa: E722 # workaround to mask native python arrays d = [ x for i,x in enumerate(d) if not self._mask[i] ] return d else: d = self._data[name][~(self._mask)] if deep_copy: d = d.copy() return d
non_numpy_array_warning_given = []
[docs] def add_virtual_dimension(self, name, units="", fn=None, arr=None, comment_regex=None, from_set=None, dtype=float, preparser=None, cache_fn_values=True, return_result=False): ''' Makes a computed vector accessible as self[name]. The computed vector depends on whether fn, arr or comment_regex is specified. It is advisable that the computed vector is of the same length as the real data columns. kwargs: Arguments for specifying how to parse the value: fn -- the function applied to the DataView object, i.e self[name] returns fn(self) arr -- specify the column directly as an array, i.e. self[name] returns arr comment_regex -- for each row, take the value from the last match in a comment, otherwise np.NaN. Should be a regex string. from_set -- for each row, take the value from the corresponding snapshot file. Specify as a tuple that indexes the settings dict ("instrument_name", "parameter_name", ...). Other options: dtype -- data type (default: float) preparser -- optional preparser function that massages the value before it is passed to dtype cache_fn_values -- evaluate fn(self) immediately for the entire (unmasked) array and cache the result return_result -- return the result directly as an (nd)array instead of adding it as a virtual dimension ''' logging.debug('adding virtual dimension "%s"' % name) assert (fn is not None) + (arr is not None) + (comment_regex is not None) + (from_set is not None) == 1, 'You must specify exactly one of "fn", "arr", or "comment_regex".' if arr is not None: assert len(arr) == len(self._mask), f'len(arr)={len(arr)} must be the same as the length of the existing data columns ({len(self._mask)}).' if from_set is not None: assert self._settings is not None, 'snapshot files were not successfully parsed during dataview initialization.' if comment_regex is not None or from_set is not None: # construct the column by parsing the comments or snapshots use_set = (from_set is not None) # shorthand for convenience # pre-allocate an array for the values try: if issubclass(dtype, str): raise Exception('Do not store strings in numpy arrays (because it "works" but the behavior is unintuitive, i.e. only the first character is stored if you just specify dtype=str).') vals = np.zeros(len(self._mask), dtype=dtype) if dtype is float: vals += np.nan # initialize to NaN instead of zeros except: # noqa: E722 if name not in self.non_numpy_array_warning_given: logging.info("%s does not seem to be a numpy data type. The virtual column '%s' will be a native python array instead, which may be slow." % (str(dtype), name)) self.non_numpy_array_warning_given.append(name) vals = [None for jjj in range(len(self._mask))] def set_vals(up_to_row, new_val): """ Helper that sets values up to the specified row, starting from where we last left off. This is a little trickier than might seem at first because when we parse a new value, we don't yet know the row up to which it applies. Instead, we always set the previous value up to row where the new value appeared (and remember the new value for the next call). """ if up_to_row > set_vals.prev_match_on_row: # Apply preparser() and dtype(() to the previously parsed value. # # It's good to do it only here because occasionally there may be multiple definitions for the # same column and same row, usually on row zero. # These might not all have valid syntax for preparser/dtype() # so it's best to only parse the one that matters (the last one). v = set_vals.prev_val try: if preparser is not None: v = preparser(v) v = dtype(v) except: #logging.exception('Could not convert the parsed value (%s) to the specifed data type (%s).' # % (v, dtype)) raise if isinstance(vals, np.ndarray): vals[set_vals.prev_match_on_row:up_to_row] = v else: vals[set_vals.prev_match_on_row:up_to_row] = ( v for jjj in range(up_to_row-set_vals.prev_match_on_row) ) logging.debug('Setting value for rows %d:%d = %s' % (set_vals.prev_match_on_row, up_to_row, v)) set_vals.prev_match_on_row = up_to_row set_vals.prev_val = new_val set_vals.prev_match_on_row = 0 #logging.debug(self._comments) for rowno,commentstr in (self._settings if use_set else self._comments): if use_set: # simply use the value from the snapshot file assert from_set[0] in commentstr.keys(), '"%s" not found in settings.' % from_set[0] new_val = commentstr for k in from_set: new_val = new_val[k] else: # see if the comment matches the specified regex m = re.search(comment_regex, commentstr) if m is None: continue #logging.debug('Match on row %d: "%s"' % (rowno, commentstr)) if len(m.groups()) != 1: logging.warning('Did not get a unique match (%s) in comment (%d): %s' % (str(m.groups()), rowno, commentstr)) new_val = m.group(1) set_vals(up_to_row=rowno, new_val=new_val) logging.debug('Setting value for (remaining) rows %d: = %s' % (set_vals.prev_match_on_row, set_vals.prev_val)) set_vals(up_to_row=len(vals), new_val=None) return self.add_virtual_dimension(name, units=units, arr=vals, return_result=return_result) if cache_fn_values and arr is None: old_mask = self.mask().copy() # backup the mask self.clear_mask() vals = fn(self) self.mask_rows(old_mask) # restore the mask return self.add_virtual_dimension(name, units=units, arr=vals, cache_fn_values=False, return_result=return_result) if return_result: return arr else: self._virtual_dims[name] = {'fn': fn, 'cached_array': arr} self._units[name] = units
def remove_virtual_dimension(self, name): if name in self._virtual_dims.keys(): del self._virtual_dims[name] else: logging.warning('Virtual dimension "%s" does not exist.' % name) def remove_virtual_dimensions(self): self._virtual_dims = {}
[docs] def to_xarray(self, values, coords, fill_value=np.nan, coarse_graining={}, include_single_valued_params=True): """Create an N-dimensional xarray DataSet out of values, where N is equal to the number of coordinates and values are specified as a list of dataview dimension names, or (<data variable name>, f, <units>) tuples where f(self) returns a vector of length equal to the number of unmasked rows in this DataView. Alternatively, values can be a single dimension name. Coordinates are specified as a list of dimension names. Entries of the xarray corresponding to coordinate combinations that don't exist in this data set are filled with fill_value. This is well-suited for N-dimensional parameter/coordinate sweeps that were executed with nested for loops in which the looped coordinate values in each loop were selected mostly independent of other coordinates. Otherwise there will be lots of fill_value's. Usually, you'll want to use setpoints, rather than measured values, as coordinates. If a coordinate c is instead a measured value, you probably want to specify coarse graining with coarse_graining={c: <Delta>}, which causes coordinates differing by at most <Delta> to be interpreted as the same coordinate. Note that if the same coordinate combination is repeated more than once in the data set, only the last measured value will appear in the output xarray. If you want to preserve information about repetitions, add another coordinate for the repetition number. If include_single_valued_params is True, all single valued parameters will be included as attributes of the xarray. Spaces, dashes and other special characters in coordinate names are replaced automatically by underscores, as these don't work well with xarray syntax. """ assert np.isnan(fill_value), "Specifying any fill value other than np.nan is unsupported." # Get unique coordinate values for each coordinate coords = OrderedDict((c, np.unique(self[c])) for c in coords ) coord_values = OrderedDict((c, self[c]) for c in coords ) # Merge similar coordinates for c,delta in coarse_graining.items(): m = np.ones(len(coords[c]), dtype=bool) # indices to keep rng = np.arange(len(m), dtype=int) while m.sum()>1: dx_too_small = rng[:m.sum()-1][np.abs(np.diff(coords[c][m])) < delta] if len(dx_too_small) == 0: break # If there are consecutive small deltas, only drop the last one. dx_too_small = np.append( dx_too_small[:-1][np.diff(dx_too_small)>1], dx_too_small[-1] ) m[rng[m][dx_too_small]] = False # Never drop the smallest or largest coordinate m[0]=True; m[-1]=True # Overwrite the coord axis (i.e. ordered unique values) coords[c] = coords[c][m] # Replace the actual coordinate values in the data by the coarse grained ones coord_values[c] = interp1d(coord_values[c], coords[c], coords[c]) # <-- "nearest" interpolation # Create the xarrays # Preprocess 'values' argument into a uniform format: # { <data variable name>: (<data points>, <units>)) } # Accept a single dimension name as input if isinstance(values, str): values = [ values ] # Accept <dimension name> as well as (<data variablename>, f, <units>) values = dict( (v, (self[v], self.units(v))) if isinstance(v, str) else (v[0], (v[1](self), v[2])) for v in values ) # Replace special characters by underscores in dimension names special_chars = r"[\s\-+%=/*&]" sanitized_name = dict((k, k) for k in itertools.chain(coords.keys(), values.keys())) for i,c in enumerate(coords.keys()): if re.search(special_chars, c) is not None: new_c = re.sub(special_chars, "_", c, count=len(c)) assert new_c not in sanitized_name.values(), f"{new_c} already exists in coords: {sanitized_name.values()}" sanitized_name[c] = new_c if include_single_valued_params: single_valued_params = self.all_single_valued_parameters() # Convert to a DataFrame containing the data as one row per # datapoint (same as in DataView) import pandas frame = pandas.DataFrame(dict(itertools.chain( ( (sanitized_name[c], coord_values[c]) for c in coords.keys() ), ( (sanitized_name[n], v[0]) for n,v in values.items() ) ))) # Use pivot to efficiently "unstack" the data into an # xarray-style n-dimensional array pvt = frame.pivot(index=[sanitized_name[c] for c in coords.keys()], columns=[]) # Check for duplicate values if np.any(pvt.index.duplicated()): logging.warning("Multiple values in the Dataview data map to the same coordinates in the xarray.") pvt = pvt[~pvt.index.duplicated(keep='last')] # Drop duplicates, if any # Convert to xarray and add metadata #dataset = xarray.Dataset.from_dataframe(pvt) dataset = pvt.to_xarray() coord_units = dict( (sanitized_name[c], self.units(c)) for c in coords.keys() ) for n,v in values.items(): dataset[sanitized_name[n]].attrs["units"] = v[1] dataset[sanitized_name[n]].attrs["coord_units"] = coord_units if include_single_valued_params: for p,v in single_valued_params.items(): dataset[n].attrs[p] = v return dataset
def _repr_html_(self): """Output HTML representation for Jupyter display. The HTML representations use CSS and HTML files from `xarray <https://docs.xarray.dev/en/stable/index.html>`_, which is licensed under Apache License Version 2.0 (see licenses/XARRAY_LICENSE file). xarray in turn uses icons from the icomoon package, which is licensed under CC BY 4.0 (see licenses/ICOMOON_LICENSE). """ template = html_template_env.get_template('dataview-template.html') dimension_list = [] for dim_name in self.dimensions(): dimension_list.append({ 'uuid': str(uuid.uuid4()), 'name': dim_name, 'units': self.units(dim_name), 'dtype': self._get_dtype_repr(dim_name), 'vals_preview': DataView._array_to_str(self[dim_name], 1), 'vals': DataView._array_to_str(self[dim_name], 200), }) settings_list = [] for row,s in self.settings(): full = json.dumps(s, indent=2) settings_list.append({ 'uuid': str(uuid.uuid4()), 'row': row, 'preview': full[:min(len(full), 50)] + ("..." if len(full)>50 else ""), 'full': full[:min(len(full), 900)] + ("..." if len(full)>900 else ""), }) # Generate the HTML html_out = template.render( global_props={ 'nrows': len(self.mask()), 'n_unmasked_rows': sum(1 - self.mask()), 'ndatasets': len(np.unique(self["data_source"])) if "data_source" in self.dimensions() else "?" }, dimlist_uuid= str(uuid.uuid4()), settingslist_uuid= str(uuid.uuid4()), dimension_list=dimension_list, settings_list=settings_list, settings_to_display=min(20, len(settings_list))) #with open(os.path.join('.', 'dataview_repr.html'), 'w') as f: f.write(html_out) return f"<div>\n{style_css}\n{html_out}</div>" def _get_dtype_repr(self, dim_name): """ Get data type of self[dim_name] in a compact string format. """ if len(self.mask())==0: return "?" dt = type(self[dim_name][0]) return dt.__name__.rstrip('_') if hasattr(dt, "__name__") else str(dt) @staticmethod def _array_to_str(arr, maxrows=7, vals_per_row=4): """ Convert an array to string for visualization purposes. """ if maxrows==1: if len(arr) == vals_per_row: return ', '.join(str(x) for x in arr) s = ', '.join(str(x) for x in arr[:min(vals_per_row-1, len(arr))]) if len(arr) < vals_per_row: return s return s + ', ..., ' + str(arr[-1]) rows = [] while (len(rows)+1)*vals_per_row < len(arr) and len(rows) < maxrows-1: rows.append(', '.join(str(x) for x in arr[len(rows)*vals_per_row:(len(rows)+1)*vals_per_row])) if len(rows)==maxrows-1 and (len(rows)+1)*vals_per_row < len(arr): rows.append('...') rows.append(', '.join(str(x) for x in arr[-min(vals_per_row, len(arr)):])) #print(rows[0]) return ",\n".join(rows)
# Preload style.css and icons for html output. # # There are no Jinja statements in them, but the Jinja PackageLoader # is convenient for locating the files in a robust way. style_css = f""" {html_template_env.get_template('icons-svg-inline.html').render()} <style> {html_template_env.get_template('style.css').render()} </style> """