Source code for hveto.triggers

# -*- coding: utf-8 -*-
# Copyright (C) Joshua Smith (2016-)
#
# This file is part of the hveto python package.
#
# hveto is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# hveto is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with hveto.  If not, see <http://www.gnu.org/licenses/>.

"""Trigger I/O utilities for hveto
"""

import glob
import os.path
import re
import warnings
from collections import OrderedDict

import numpy

from astropy.table import vstack as vstack_tables

import gwtrigfind

from gwpy.io import cache as io_cache
from gwpy.io.utils import file_list
from gwpy.io.registry import get_read_format
from gwpy.table import EventTable
from gwpy.table.filters import in_segmentlist
from gwpy.table.io.pycbc import filter_empty_files as filter_empty_pycbc_files
from gwpy.segments import SegmentList

# Table metadata keys to keep
TABLE_META = ('tablename',)

DEFAULT_FORMAT = {
    # ETG      format
    "omicron": "hdf5",
    "pycbc_live": "hdf5.pycbc_live",
    "kleinewelle": "ligolw",
    "dmt_omega": "ligolw",
}

DEFAULT_TRIGFIND_OPTIONS = {
    # ETG          format
    ("dmt_omega", "ligolw"): {
        "ext": "xml.gz",
    },
    ("kleinewelle", "ligolw"): {
        "ext": "xml.gz",
    },
    ("omicron", "hdf5"): {
        "ext": "h5",
    },
    ("omicron", "ligolw"): {
        "ext": "xml.gz",
    },
    ("omicron", "root.omicron"): {
        "ext": "root",
    }
}

_SNGL_BURST_OPTIONS = {
    "tablename": "sngl_burst",
    "columns": ["peak", "peak_frequency", "snr"],
    "use_numpy_dtypes": True,
}

DEFAULT_READ_OPTIONS = {
    # ETG          format
    ("dmt_omega", "ligolw"): _SNGL_BURST_OPTIONS,
    ("kleinewelle", "ligolw"): _SNGL_BURST_OPTIONS,
    ("omicron", "ligolw"): _SNGL_BURST_OPTIONS,
    ("omicron", "hdf5"): {
        "path": "triggers",
        "columns": ["time", "frequency", "snr"],
    },
    ("pycbc_live", "hdf5.pycbc_live"): {
        "extended_metadata": False,
        "loudest": True,
        "columns": ["end_time", "template_duration", "new_snr"],
    },
}


# -- find files/channels ------------------------------------------------------

[docs]def find_trigger_files(channel, etg, segments, **kwargs): """Find trigger files for a given channel and ETG Parameters ---------- channel : `str` name of channel to find etg : `str` name of event trigger generator to find segments : :class:`~ligo.segments.segmentlist` list of segments to find **kwargs all other keyword arguments are passed to `gwtrigfind.find_trigger_urls` Returns ------- cache : `list` of `str` cache of trigger file paths See Also -------- gwtrigfind.find_trigger_urls for details on file discovery """ # format arguments etg = _sanitize_name(etg) try: readfmt = kwargs.pop("format", DEFAULT_FORMAT[etg]) except KeyError: raise ValueError("unsupported ETG {!r}".format(etg)) for key, val in DEFAULT_TRIGFIND_OPTIONS.get((etg, readfmt), {}).items(): kwargs.setdefault(key, val) cache = [] for start, end in segments: try: cache.extend(gwtrigfind.find_trigger_files(channel, etg, start, end, **kwargs)) except ValueError as e: if str(e).lower().startswith('no channel-level directory'): warnings.warn(str(e)) else: raise # sanitise URLs (remove 'file://' prefix, etc) cache = file_list(cache) # remove 'empty' pycbc files from the cache if etg == "pycbc_live": ifo = channel.split(":", 1)[0] cache = filter_empty_pycbc_files(cache, ifo=ifo) return type(cache)(OrderedDict.fromkeys(cache))
re_delim = re.compile('[_-]')
[docs]def find_auxiliary_channels(etg, gps='*', ifo='*', cache=None): """Find all auxiliary channels processed by a given ETG If `cache=None` is given (default), the channels are parsed from the ETG archive under ``/home/detchar/triggers``. Otherwise, the channel names are parsed from the files in the `cache`, assuming they follow the T050017 file-naming convention. Parameters ---------- etg : `str` name of the trigger generator gps : `int`, `tuple`, optional GPS reference time, or pair of times indicating [start, end), at which to find channels ifo : `str`, optional interferometer prefix for which to find channels cache : `list` of `str`, optional `list` of files from which to parse channels Returns ------- channels : `list` of `str` the names of all available auxiliary channels """ out = set() if cache is not None: for url in cache: try: ifo, name = os.path.basename(url).split('-')[:2] except AttributeError: # CacheEntry ifo = url.observatory name = url.description channel = '%s:%s' % (ifo, name.replace('_', '-', 1)) if channel.lower().endswith(etg.lower()): channel = channel[:-len(etg)] out.add(u'%s' % channel.rstrip('_')) else: if not isinstance(gps, (list, tuple)): gps = (gps, gps) gps5 = int(str(gps[0])[:5]) gpse = int(str(gps[-1])[:5]) while gps5 <= gpse and gps5 * 1e5 < gps[-1]: channels = glob.glob(os.path.join( '/home/detchar/triggers', ifo, '*', str(gps5))) if len(channels) == 0: # try old convention channels = glob.glob(os.path.join( '/home/detchar/triggers', '*', ifo, '*', str(gps5))) use_o2 = False else: use_o2 = True stub = '_%s' % etg.lower() for path in channels: path = os.path.split(path)[0] if not path.lower().endswith('_%s' % etg.lower()): continue ifo, name = path[:-len(stub)].rsplit(os.path.sep)[-2:] if use_o2: out.add(u'%s:%s' % (ifo, name.replace('_', '-', 1))) else: out.add(u'%s:%s' % (ifo, name)) gps5 += 1 return sorted(out)
# -- read --------------------------------------------------------------------- def _sanitize_name(name): return re.sub(r"[-_.]", "_", name).lower() def _format_params(channel, etg, fmt, trigfind_kwargs, read_kwargs): # format kwargs for trigfind if trigfind_kwargs is None: trigfind_kwargs = {} for key, val in DEFAULT_TRIGFIND_OPTIONS.get((etg, fmt), {}).items(): trigfind_kwargs.setdefault(key, val) # format default read kwargs for key, val in DEFAULT_READ_OPTIONS.get((etg, fmt), {}).items(): read_kwargs.setdefault(key, val) read_kwargs.setdefault("format", fmt) # format params for key in read_kwargs: if key.endswith(('columns', 'names', 'branches')) and isinstance(read_kwargs[key], str): read_kwargs[key] = [x.strip() for x in read_kwargs[key].split(',')] # custom params for ETGs if etg == "pycbc_live": read_kwargs.setdefault("ifo", channel.split(":", 1)[0]) return trigfind_kwargs, read_kwargs
[docs]def get_triggers(channel, etg, segments, cache=None, snr=None, frange=None, raw=False, extra_times=None, trigfind_kwargs={}, **read_kwargs): """Get triggers for the given channel """ etg = _sanitize_name(etg) # format arguments try: readfmt = read_kwargs.pop("format", DEFAULT_FORMAT[etg]) except KeyError: raise ValueError("unsupported ETG {!r}".format(etg)) if not readfmt and cache: # try to identify format from files in cache readfmt = get_read_format(EventTable, cache[0], {}, {}) trigfind_kwargs, read_kwargs = _format_params( channel, etg, readfmt, trigfind_kwargs, read_kwargs ) # find triggers if cache is None: cache = find_trigger_files(channel, etg, segments, **trigfind_kwargs) # read files tables = [] for segment in segments: segaslist = SegmentList([segment]) segcache = io_cache.sieve(cache, segment=segment) # try and work out if cache overextends segment (so we need to crop) cachesegs = io_cache.cache_segments(segcache) outofbounds = abs(cachesegs - segaslist) if segcache: # if len(segcache) == 1: # just pass the single filename # segcache = segcache[0] # read trigger files one by one so we can ignore empty ones # new = EventTable.read(segcache, **read_kwargs) new: EventTable = None for trig_file in segcache: trig_tbl = EventTable.read(trig_file, **read_kwargs) if len(trig_tbl) > 0: if new: new = vstack_tables([new, trig_tbl]) else: new = trig_tbl else: pass # place for a breakpoint if new is not None and len(new) > 0: new.meta = {k: new.meta[k] for k in TABLE_META if new.meta.get(k)} if outofbounds: new = new[in_segmentlist(new[new.dtype.names[0]], segaslist)] tables.append(new) if len(tables): table = vstack_tables(tables) else: table = EventTable(names=read_kwargs.get( 'columns', ['time', 'frequency', 'snr'])) # parse time, frequency-like and snr-like column names columns = table.dtype.names tcolumn = columns[0] fcolumn = columns[1] scolumn = columns[2] # filter keep = numpy.ones(len(table), dtype=bool) if snr is not None: keep &= table[scolumn] >= snr if frange is not None: keep &= table[fcolumn] >= frange[0] keep &= table[fcolumn] < frange[1] table = table[keep] if extra_times: f_low = min(table[fcolumn]) s_low = min(table[scolumn]) for time in extra_times: extra_row = numpy.zeros(len(table.colnames)) extra_row[0] = time extra_row[1] = f_low extra_row[2] = s_low table.add_row(extra_row) # return basic table if 'raw' if raw: return table # rename time column so that all tables match in at least that if tcolumn != "time": table.rename_column(tcolumn, 'time') # add channel column to identify all triggers table.add_column(table.Column(data=numpy.repeat(channel, len(table)), name='channel')) table.sort('time') return table