# -*- coding: utf-8 -*-
# Copyright (C) Joshua Smith (2016)
#
# This file is part of the hveto python package.
#
# hveto is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# hveto is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with hveto. If not, see <http://www.gnu.org/licenses/>.
"""Create a (temporary) cache of event triggers for analysis by `hveto`
This method will apply the minimal SNR thresholds and any frequency cuts
as given in the configuration files
"""
import h5py
import os
import multiprocessing
import sys
import warnings
from pathlib import Path
from astropy.table import vstack
from gwpy.io.cache import (
cache_segments,
file_segment,
read_cache,
)
from gwpy.segments import (
Segment,
SegmentList,
DataQualityFlag,
DataQualityDict,
)
from gwdetchar import cli
from .. import (__version__, config)
from ..triggers import (
get_triggers,
find_auxiliary_channels,
find_trigger_files,
)
from ..utils import write_lal_cache
__author__ = 'Duncan Macleod <duncan.macleod@ligo.org>'
IFO = os.getenv('IFO')
# set up logger
PROG = ('python -m hveto.cli.cache_events' if sys.argv[0].endswith('.py')
else os.path.basename(sys.argv[0]))
LOGGER = cli.logger(name=PROG.split('python -m ').pop())
# -- parse command line -------------------------------------------------------
def _abs_path(p):
return Path(p).expanduser().resolve()
[docs]def create_parser():
"""Create a command-line parser for this entry point
"""
parser = cli.create_parser(
prog=PROG,
description=__doc__,
version=__version__,
)
# gwdetchar standard arguments/options
cli.add_gps_start_stop_arguments(parser)
cli.add_ifo_option(parser, required=IFO is None, ifo=IFO)
cli.add_nproc_option(parser, default=1)
# custom options
parser.add_argument(
'-f',
'--config-file',
action='append',
default=[],
type=_abs_path,
help=('path to hveto configuration file, can be given '
'multiple times (files read in order)'),
)
parser.add_argument(
'-p',
'--primary-cache',
default=None,
type=_abs_path,
help='path for cache containing primary channel files',
)
parser.add_argument(
'-a',
'--auxiliary-cache',
default=None,
type=_abs_path,
help=('path for cache containing auxiliary channel files, '
'files contained must be T050017-compliant with the '
'channel name as the leading name parts, e.g. '
'\'L1-GDS_CALIB_STRAIN_<tag>-<start>-<duration>.'
'<ext>\' for L1:GDS-CALIB_STRAIN triggers'),
)
parser.add_argument(
'-S',
'--analysis-segments',
action='append',
default=[],
type=_abs_path,
help=('path to LIGO_LW XML file containing segments for '
'the analysis flag (name in segment_definer table '
'must match analysis-flag in config file)'),
)
parser.add_argument(
'--append',
action='store_true',
default=False,
help=('append to existing cached event files, otherwise, '
'start from scratch (default)'),
)
# output options
pout = parser.add_argument_group('Output options')
pout.add_argument(
'-o',
'--output-directory',
default=os.curdir,
type=_abs_path,
help='path of output directory, default: %(default)s',
)
# return the parser
return parser
# -- main code block ----------------------------------------------------------
[docs]def main(args=None):
"""Run the cache_events tool
"""
parser = create_parser()
args = parser.parse_args(args=args)
ifo = args.ifo
start = int(args.gpsstart)
end = int(args.gpsend)
duration = end - start
LOGGER.info("-- Welcome to Hveto --")
LOGGER.info("GPS start time: %d" % start)
LOGGER.info("GPS end time: %d" % end)
LOGGER.info("Interferometer: %s" % ifo)
# -- initialisation -------------------------------
# read configuration
cp = config.HvetoConfigParser(ifo=args.ifo)
cp.read(map(str, args.config_file))
LOGGER.info("Parsed configuration file(s)")
# format output directory
outdir = args.output_directory
outdir.mkdir(parents=True, exist_ok=True)
LOGGER.info("Working directory: {}".format(outdir))
trigdir = outdir / 'triggers'
trigdir.mkdir(parents=True, exist_ok=True)
# get segments
aflag = cp.get('segments', 'analysis-flag')
url = cp.get('segments', 'url')
padding = cp.getfloats('segments', 'padding')
if args.analysis_segments:
segs_ = DataQualityDict.read(args.analysis_segments, gpstype=float)
analysis = segs_[aflag]
span = SegmentList([Segment(start, end)])
analysis.active &= span
analysis.known &= span
analysis.coalesce()
LOGGER.debug("Segments read from disk")
else:
analysis = DataQualityFlag.query(aflag, start, end, url=url)
LOGGER.debug("Segments recovered from %s" % url)
analysis.pad(*padding)
livetime = int(abs(analysis.active))
livetimepc = livetime / duration * 100.
LOGGER.info("Retrieved %d segments for %s with %ss (%.2f%%) livetime"
% (len(analysis.active), aflag, livetime, livetimepc))
snrs = cp.getfloats('hveto', 'snr-thresholds')
minsnr = min(snrs)
# -- utility methods ------------------------------
def create_path(channel):
ifo, name = channel.split(':', 1)
name = name.replace('-', '_')
return trigdir / "{}-{}-{}-{}.h5".format(ifo, name, start, duration)
def read_and_cache_events(channel, etg, cache=None, trigfind_kw={},
**read_kw):
cfile = create_path(channel)
# read existing cached triggers and work out new segments to query
if args.append and cfile.is_file():
previous = DataQualityFlag.read(
str(cfile),
path='segments',
format='hdf5',
).coalesce()
new = analysis - previous
else:
new = analysis.copy()
# get cache of files
if cache is None:
cache = find_trigger_files(channel, etg, new.active, **trigfind_kw)
else:
cache = list(filter(
lambda e: new.active.intersects_segment(file_segment(e)),
cache,
))
# restrict 'active' segments to when we have data
try:
new.active &= cache_segments(cache)
except IndexError:
new.active = type(new.active)()
# find new triggers
try:
trigs = get_triggers(channel, etg, new.active, cache=cache,
raw=True, **read_kw)
# catch error and continue
except ValueError as e:
warnings.warn('%s: %s' % (type(e).__name__, str(e)))
else:
path = write_events(channel, trigs, new)
try:
return path, len(trigs)
except TypeError: # None
return
def write_events(channel, tab, segments):
"""Write events to file with a given filename
"""
# get filename
path = create_path(channel)
h5f = h5py.File(str(path), 'a')
# read existing table from file
try:
old = tab.read(h5f["triggers"], format="hdf5")
except KeyError:
pass
else:
tab = vstack(old, tab)
# append event table
tab.write(h5f, path="triggers", append=True, overwrite=True)
# write segments
try:
oldsegs = DataQualityFlag.read(h5f, path="segments", format="hdf5")
except KeyError:
pass
else:
segments = oldsegs + segments
segments.write(h5f, path="segments", append=True, overwrite=True)
# write file to disk
h5f.close()
return path
# -- load channels --------------------------------
# get primary channel name
pchannel = cp.get('primary', 'channel')
# read auxiliary cache
if args.auxiliary_cache is not None:
acache = [e for c in args.auxiliary_cache for e in read_cache(str(c))]
else:
acache = None
# load auxiliary channels
auxetg = cp.get('auxiliary', 'trigger-generator')
auxfreq = cp.getfloats('auxiliary', 'frequency-range')
try:
auxchannels = cp.get('auxiliary', 'channels').strip('\n').split('\n')
except config.configparser.NoOptionError:
auxchannels = find_auxiliary_channels(auxetg, start, ifo=args.ifo,
cache=acache)
# load unsafe channels list
_unsafe = cp.get('safety', 'unsafe-channels')
if os.path.isfile(_unsafe): # from file
unsafe = set()
with open(_unsafe, 'rb') as f:
for c in f.read().rstrip(b'\n').split(b'\n'):
if c.startswith(b'%(IFO)s'):
unsafe.add(c.replace(b'%(IFO)s', ifo))
elif not c.startswith('%s:' % ifo):
unsafe.add('%s:%s' % (ifo, c))
else:
unsafe.add(c)
else: # or from line-seprated list
unsafe = set(_unsafe.strip('\n').split('\n'))
unsafe.add(pchannel)
cp.set('safety', 'unsafe-channels', '\n'.join(sorted(unsafe)))
LOGGER.debug("Read list of %d unsafe channels" % len(unsafe))
# remove duplicates
auxchannels = sorted(set(auxchannels))
LOGGER.debug("Read list of %d auxiliary channels" % len(auxchannels))
# remove unsafe channels
nunsafe = 0
for i in range(len(auxchannels) - 1, -1, -1):
if auxchannels[i] in unsafe:
LOGGER.warning("Auxiliary channel %r identified as unsafe and has "
"been removed" % auxchannels[i])
auxchannels.pop(i)
nunsafe += 1
LOGGER.debug("%d auxiliary channels identified as unsafe" % nunsafe)
naux = len(auxchannels)
LOGGER.info("Identified %d auxiliary channels to process" % naux)
# -- load primary triggers -------------------------
LOGGER.info("Reading events for primary channel...")
# read primary cache
if args.primary_cache is not None:
pcache = [e for c in args.primary_cache for e in read_cache(str(c))]
else:
pcache = None
# get primary params
petg = cp.get('primary', 'trigger-generator')
psnr = cp.getfloat('primary', 'snr-threshold')
pfreq = cp.getfloats('primary', 'frequency-range')
preadkw = cp.getparams('primary', 'read-')
ptrigfindkw = cp.getparams('primary', 'trigfind-')
# load primary triggers
out = read_and_cache_events(pchannel, petg, snr=psnr, frange=pfreq,
cache=pcache, trigfind_kw=ptrigfindkw,
**preadkw)
try:
e, n = out
except TypeError:
e = None
n = 0
if n:
LOGGER.info("Cached %d new events for %s" % (n, pchannel))
elif args.append and e.is_file():
LOGGER.info("Cached 0 new events for %s" % pchannel)
else:
message = "No events found for %r in %d seconds of livetime" % (pchannel, livetime)
LOGGER.critical(message)
# write primary to local cache
pname = trigdir / '{}-HVETO_PRIMARY_CACHE-{}-{}.lcf'.format(
ifo, start, duration,
)
write_lal_cache(str(pname), [e])
LOGGER.info('Primary cache written to {}'.format(pname))
# -- load auxiliary triggers -----------------------
LOGGER.info("Reading triggers for aux channels...")
counter = multiprocessing.Value('i', 0)
areadkw = cp.getparams('auxiliary', 'read-')
atrigfindkw = cp.getparams('auxiliary', 'trigfind-')
def read_and_write_aux_triggers(channel):
if acache is None:
auxcache = None
else:
ifo, name = channel.split(':')
match = "{}-{}".format(ifo, name.replace('-', '_'))
auxcache = [e for e in acache if Path(e).name.startswith(match)]
out = read_and_cache_events(
channel, auxetg, cache=auxcache, snr=minsnr, frange=auxfreq,
trigfind_kw=atrigfindkw, **areadkw)
try:
e, n = out
except TypeError:
e = None
n = 0
# log result of load
with counter.get_lock():
counter.value += 1
tag = '[%d/%d]' % (counter.value, naux)
if e is None: # something went wrong
LOGGER.critical(" %s Failed to read events for %s"
% (tag, channel))
else: # either read events or nothing new
LOGGER.debug(" %s Cached %d new events for %s"
% (tag, n, channel))
return e
# map with multiprocessing
if args.nproc > 1:
pool = multiprocessing.Pool(processes=args.nproc)
results = pool.map(read_and_write_aux_triggers, auxchannels)
pool.close()
# map without multiprocessing
else:
results = map(read_and_write_aux_triggers, auxchannels)
acache = [x for x in results if x is not None]
aname = trigdir / '{}-HVETO_AUXILIARY_CACHE-{}-{}.lcf'.format(
ifo, start, duration,
)
write_lal_cache(str(aname), acache)
LOGGER.info('Auxiliary cache written to {}'.format(aname))
# -- finish ----------------------------------------
LOGGER.info('Done, you can use these cache files in an hveto analysis by '
'passing the following arguments:\n\n--primary-cache {} '
'--auxiliary-cache {}\n'.format(pname, aname))
# -- run code -----------------------------------------------------------------
if __name__ == "__main__":
main()