Source code for darclight.io

"""Module to manage general information like path to data"""
import os
from glob import iglob
from fnmatch import fnmatch
from collections.abc import Iterable
from collections import defaultdict
import logging
from functools import cached_property
from pathlib import Path
from typing import Generator, Tuple
import numpy as np
from astropy.io import fits
from astropy.table import Table
from astropy.time import Time
import astropy.units as u

logger = logging.getLogger(__name__)

COMMON_KEYWORDS = {'BIAS':['bias', 'zero'],
                   'DARK':['dark'],
                   'FLAT':['flat'],
                   'LIGHT':['light', 'science', 'object']}


[docs] class DataCollection(): """Class that organizes all files in a given directory. """ def __init__(self, path:str|None=None, reduced_path:str|None=None, ignore:list|None=None, filelist:list[str]|None=None): # ensure that both paths are an Path object if path is None: self.path = Path('.') else: self.path = Path(path) if reduced_path is None: self.reduced_path = Path('./reduced') else: self.reduced_path = Path(reduced_path) # create the path if it does not exist self.reduced_path.mkdir(exist_ok=True) self.ignore = [] if ignore is None else ignore self.raw_files = Table() self.reduced_files = Table() self.scan(raw=True, reduced=True) logger.debug("DataCollection created") @staticmethod def _get_imagetype(keyword:str)->str: keyword = keyword.lower() for typ, common_kwd in COMMON_KEYWORDS.items(): if any([kwd in keyword for kwd in common_kwd]): return typ raise RuntimeError('The keyword "%s" could not be matched to any known type. ' \ 'Check the word or add a value to the "COMMON_KEYWORDS" variable.') def _scan_raw(self): # recreate the table self.raw_files = Table(names=('ID', 'FILENAME', 'TYPE', 'DATE-OBS', 'NIGHT', 'JD', 'EXPOSURE', 'FILTER', 'OBJECT'), dtype=('i4', 'U', 'U', 'U', 'U', 'f8', 'f4', 'U', 'U')) # check files for file in iglob(str(self.path)+'/**/*', recursive=True): if (os.path.isdir(file) or any((fnmatch(file, pat) for pat in self.ignore)) or fnmatch(file, f"*{self.reduced_path}*")): # skip if directory or contains a pattern from the ignore list or is an reduced file continue hdr = fits.getheader(file) self.raw_files.add_row((0, file, self._get_imagetype(hdr.get('IMAGETYP', 'None')), hdr.get('DATE-OBS', '0000-00-00T00:00:00'), '0000-00-00', hdr.get('JD', 0), hdr.get('EXPOSURE', -1), hdr.get('FILTER', 'None'), hdr.get('OBJECT', 'None'))) def _scan_reduced(self): self.reduced_files = Table(names=('ID', 'FILENAME', 'TYPE', 'DATE', 'NIGHT', 'EXPOSURE', 'FILTER', 'OBJECT', 'COMBINED'), dtype=('i4', 'U', 'U', 'U', 'U', 'f4', 'U', 'U', 'bool')) for file in iglob(str(self.reduced_path)+'/**/*', recursive=True): if (os.path.isdir(file) or any((fnmatch(file, pat) for pat in self.ignore))): continue hdr = fits.getheader(file) self.reduced_files.add_row((0, file, self._get_imagetype(hdr['IMAGETYP']), hdr.get('DATE', '1999-01-01T00:00:00.000'), '0000-00-00', hdr.get('EXPOSURE', -1), hdr.get('FILTER', 'None'), hdr.get('OBJECT', 'None'), hdr.get('COMBINED', False)))
[docs] def scan(self, raw:bool=True, reduced:bool=True): """scans the directories from scractch :param raw: whether or not the raw directory should be scaned, defaults to True :type raw: bool, optional :param reduced: whether or not the reduced directory should be scanned, defaults to True :type reduced: bool, optional """ if raw: self._scan_raw() # add value for the night --> avoid wrap over during midnight self.raw_files['NIGHT'] = np.array(( Time(self.raw_files['DATE-OBS'])-12*u.hour ).to_value('iso', subfmt='date')) # sort by time and add unique ID to each file self.raw_files.sort('DATE-OBS') self.raw_files['ID'] = np.arange(len(self.raw_files)) # group the files by necessary values # self.raw_files = self.raw_files.group_by(['TYPE', 'NIGHT', 'FILTER', 'EXPOSURE', 'TARGET']) if reduced: self._scan_reduced() # only update the data if at least one file is found if len(self.reduced_files) > 0: self.reduced_files['NIGHT'] = np.array(( Time(self.reduced_files['DATE'])-12*u.hour ).to_value('iso', subfmt='date')) self.reduced_files.sort('DATE') self.reduced_files['ID'] = np.arange(len(self.reduced_files))
[docs] def add_file(self, fname:str|Path, reduced:bool=False, row:tuple|None=None): """adds a row to an existing table :param fname: filename to add :type fname: str | Path :param reduced: if it should be added to the raw or reduced table, defaults to False :type reduced: bool, optional :param row: row informations, depends on raw or reduced what it should contain. If None the file will be read and the values are derived automatically, defaults to None :type row: tuple | None, optional """ table = self.reduced_files if reduced else self.raw_files fname = str(fname) if row is None: hdr = fits.getheader(fname) date = hdr.get('DATE', '2000-01-01T00:00:00.000') if reduced: row = (len(table)+1, fname, self._get_imagetype(hdr['IMAGETYP']), date, (Time(date)-12*u.hour).to_value('iso', subfmt='date'), hdr.get('EXPOSURE', -1), hdr.get('FILTER', 'None'), hdr.get('OBJECT', 'None'), hdr.get('COMBINED', False)) else: row = (len(table)+1, fname, self._get_imagetype(hdr.get('IMAGETYP', 'None')), hdr.get('DATE-OBS', '2000-01-01T00:00:00'), (Time(date)-12*u.hour).to_value('iso', subfmt='date'), hdr.get('JD', 0), hdr.get('EXPOSURE', -1), hdr.get('FILTER', 'None'), hdr.get('OBJECT', 'None')) table.add_row(row)
[docs] @staticmethod def hdu_from_file(file:str)->Tuple[np.ndarray, fits.header.Header]: """gives access to the data and header of a given file :param file: name of the file :type file: str :return: the data and header from that file :rtype: Tuple[np.ndarray,astropy.io.fits.header.Header] """ with fits.open(file) as hdul: # type: ignore data = hdul[0].data header = hdul[0].header return data, header
[docs] @staticmethod def save_file(filename:str|Path, data:np.ndarray, header:fits.header.Header|None=None)->None: """Saves the given data and header with the given filename in the reduced data directory. :param filename: Desired name for the file :type filename: str :param data: data that should be stored in the file :type data: np.ndarray :param header: header for the file, defaults to None :type header: astropy.io.fits.header.Header, optional """ hdu = fits.PrimaryHDU(data, header) hdul = fits.HDUList([hdu]) hdul.writeto(filename, overwrite=True) logger.debug("Saved the file '%s'", filename)
@property def used_filters(self)->list[str]: """List of used filters where a flat frame is available. :return: list of filters :rtype: list[str] """ flats = self.raw_files[self.raw_files['TYPE']=='FLAT'] return list({str(f) for f in flats['FILTER']}) @property def dark_exposures(self)->list[int]: """List of exposure times where a dark frame is directly available. :return: list of exposure times :rtype: list[int] """ darks = self.raw_files[self.raw_files['TYPE']=='DARK'] return list({int(e) for e in darks['EXPOSURE']}) @property def flat_exposures(self)->dict[str|None,set]: """exposure times for each filter :return: dictionary of the form {filter:exposure, ...} :rtype: dict[str|None,set] """ result = defaultdict(set) flats = self.raw_files[self.raw_files['TYPE']=='FLAT'] for filt, exp in flats['FILTER', 'EXPOSURE']: result[str(filt)].add(int(exp)) return dict(result) @property def targets(self)->list[str]: """List of targets captured. :return: list of targets :rtype: list[str] """ lights = self.raw_files[self.raw_files['TYPE']=='LIGHT'] return [str(obj) for obj in set(lights['OBJECT'])] @cached_property def light_meta(self)->dict[str,set[Tuple[str, int]]]: """Metadata for the light frames :return: dictionary of the form {target:[(filter,exposure),...],...}, for every target there is a set of tuples that each contain the filter and the corresponding exposure time. :rtype: dict[str,set[Tuple[str, int]]] """ result = defaultdict(set) lights = self.raw_files[self.raw_files['TYPE']=='LIGHT'] for target, filt, exp in lights['OBJECT', 'FILTER', 'EXPOSURE']: if exp is not None and filt is not None: result[target].add((filt, int(exp))) logger.debug("Created meta data for lights:\n\t%s", result) return dict(result)
[docs] def get_files(self, reduced:bool=False, **filters)->np.ndarray: """returns the filenames of the files specified :param reduced: whether the desired files should be reduced or not, defaults to False :type reduced: bool, optional :return: filenames satisfy the filters :rtype: np.ndarray """ table = self.reduced_files if reduced else self.raw_files mask = np.ones(len(table), dtype=bool) for filt, val in filters.items(): mask &= table[filt.upper()] == val files = np.array(table['FILENAME'][mask]) files = files[0] if len(files)==1 else files return files
@property def bias_files(self): return self.get_files(type='BIAS') @cached_property def dark_files(self): return {expo:self.get_files(type='DARK', exposure=expo) for expo in self.dark_exposures} @cached_property def flat_files(self): return {filt:self.get_files(type='FLAT', filter=filt) for filt in self.used_filters}
[docs] def get_master(self, imagetype:str, specifier:int|str|None=None, header=True)->np.ndarray|tuple|None: """looks for a specific stacked master frame :param imagetype: type of the image to check, use 'bias', 'dark', 'flat' or 'light' :type imagetype: str :param specifier: exposure time to look for, if imagetype='dark' or filter to look for if imagetype='flat' or target if imagetype='light, defaults to None :type specifier: int | str | None, optional :return: data of the required file, None if it does not exist :rtype: np.ndarray | None """ combined = np.array(self.reduced_files['COMBINED']) match imagetype.lower(): case 'bias': mask = np.array(self.reduced_files['TYPE']=='BIAS') & combined case 'dark': if specifier is None: # if no specifier is given any stacked dark will do mask = (np.array(self.reduced_files['TYPE']=='DARK') & combined) # ensure that the specifier has the correct type elif isinstance(specifier, (int, float)): mask = (np.array(self.reduced_files['TYPE']=='DARK') & np.array(self.reduced_files['EXPOSURE']==specifier) & combined) else: raise ValueError("The specifier has to be an int or float, "+ f"you provided {type(specifier)}") case 'flat': if specifier is None: # if no specifier is given any flat will do mask = (np.array(self.reduced_files['TYPE']=='FLAT') & combined) # ensure that the specifier has the correct type elif isinstance(specifier, str): mask = (np.array(self.reduced_files['TYPE']=='FLAT') & np.array(self.reduced_files['FILTER']==specifier) & combined) else: raise ValueError(f"The specifier has to be a string, you provided {type(specifier)}.") case 'light': if specifier is None: # if no specifier is given any flat will do mask = (np.array(self.reduced_files['TYPE']=='FLAT') & combined) # ensure that the specifier has the correct type elif isinstance(specifier, int): mask = (np.array(self.reduced_files['TYPE']=='FLAT') & np.array(self.reduced_files['FILTER']==specifier) & combined) else: raise ValueError(f"The specifier has to be a string, you provided {type(specifier)}.") case _: raise ValueError(f"You provided an invalid imagetype '{imagetype}', use 'bias', 'dark' or 'flat'") if np.sum(mask) == 0: return None elif np.sum(mask) == 1: idx = np.nonzero(mask) data, hdr = self.hdu_from_file(str(self.reduced_files['FILENAME'][idx][0])) if header: return data, header return data else: # too many matches files = [str(f) for f in self.reduced_files['FILENAME'][mask]] raise RuntimeError(f"Found {len(files)} frames matching ({files})." + "Include a specifier or ensure only one master frame for the given specifier exists.")
[docs] @staticmethod def file_data(filelist:Iterable[str], data:bool=True, header:bool=False, fname:bool=False, return_kwds:list[str]|None=None, **filter_kwds)->Generator: """generator to get the data and/or header of the files in the provided list. :param filelist: list of files to iterate over :type filelist: list :param data: whether or not the data of the file should be returned, defaults to True :type data: bool, optional :param header: whether or not the header should be returned, defaults to False :type header: bool, optional :param fname: whether or not the filename should be returned, defaults to False :type fname: bool, optional :param return_kwds: additional keywords that should be returned alongside the data/header, defaults to None :type return_kwds: list[str] | None :param filter_kwds: additional keywords the returned data should be filtered for from the header :raises ValueError: if both (data and header) are set to False. If you want only the filenames address the attribute directly. :yield: tuple of the desired outputs in the order (data, header, filename, return_kwd 1,...) :rtype: Tuple """ # TODO: include the possibility to only yield keywords? if not data and not header: raise ValueError("At least one of 'data' and 'header' must be True." \ f"You provided: data={data} and header={header}.") return_kwd = [] if return_kwds is None else return_kwds kwds = [] for file in filelist: out = [] if filter_kwds or return_kwd: hdr = fits.getheader(file) if any(hdr.get(kwd) != val for kwd, val in filter_kwds.items()): # skip if the keywords do not match continue for kwd in return_kwd: kwds.append(hdr.get(kwd)) if data: out.append(fits.getdata(file) if data else None) if header: out.append(fits.getheader(file) if header else None) if fname: out.append(file if fname else None) out = out + kwds # add the desired keywords to the end yield tuple(out) if len(out)>1 else out[0]
[docs] def bias(self, data:bool=True, header:bool=False, fname:bool=False, **filter_kwds)->Generator: """Generator to get the data and/or header of the files in the raw bias frames. :param data: whether or not the data of the file should be returned, defaults to True :type data: bool, optional :param header: whether or not the header should be returned, defaults to False :type header: bool, optional :param fname: whether or not the filename should be returned, defaults to False :type fname: bool, optional :raises ValueError: if both (data and header) are set to False. If you want only the filenames address the attribute directly. :yield: tuple of the desired outputs in the order (data, header, filename) :rtype: Tuple """ bias = self.raw_files[self.raw_files['TYPE']=='BIAS'] bias_files = np.array(bias['FILENAME']) return self.file_data(bias_files, data, header, fname, **filter_kwds)
[docs] def darks(self, exposure:int, data:bool=True, header:bool=False, fname:bool=False, **filter_kwds)->Generator: """Generator to get the data and/or header of the files of the raw dark frames for a specific exposure. :param exposure: the exposure time of the dark frame :type exposure: int :param data: whether or not the data of the file should be returned, defaults to True :type data: bool, optional :param header: whether or not the header should be returned, defaults to False :type header: bool, optional :param fname: whether or not the filename should be returned, defaults to False :type fname: bool, optional :raises ValueError: This error is raised if there is no dark frame with the given exposure registered. Try 'update_raw()' if you think there should be one :raises ValueError: if both (data and header) are set to False. If you want only the filenames address the attribute directly. :yield: tuple of the desired outputs in the order (data, header, filename) :rtype: Tuple """ if exposure not in self.dark_exposures: raise ValueError(f"There is no dark frame for this exposure: {exposure}") darks = self.raw_files[self.raw_files['TYPE']=='DARK'] dark_files = np.array(darks['FILENAME'][darks['EXPOSURE']==exposure]) return self.file_data(dark_files, data, header, fname, **filter_kwds)
[docs] def flats(self, used_filter:str|None, data:bool=True, header:bool=False, fname:bool=False, return_kwds:list[str]|None=None, **filter_kwds)->Generator: """Generator to get the data and/or header of the files of the raw flat frames for a specific filter. :param used_filter: the exposure time of the flat frame :type used_filter: str | None :param data: whether or not the data of the file should be returned, defaults to True :type data: bool, optional :param header: whether or not the header should be returned, defaults to False :type header: bool, optional :param fname: whether or not the filename should be returned, defaults to False :type fname: bool, optional :raises ValueError: This error is raised if there is no flat frame with the given filter registered. Try 'update_raw()' if you think there should be one :raises ValueError: if both (data and header) are set to False. If you want only the filenames address the attribute directly. :yield: tuple of the desired outputs in the order (data, header, filename) :rtype: Tuple """ if used_filter not in self.used_filters: raise ValueError(f"There is no flat frame for this filter: {used_filter}") flat_files = self.get_files(type='FLAT', filter=used_filter) return self.file_data(flat_files, data, header, fname, return_kwds, **filter_kwds)
[docs] def lights(self, target:str, data:bool=True, header:bool=False, fname:bool=False, return_kwds:list[str]|None=None, reduced:bool=False, **filter_kwds)->Generator: """Generator to get the data and/or header of the files of the raw light frames for a specific target. :param target: the target of the light frame :type target: str :param data: whether or not the data of the file should be returned, defaults to True :type data: bool, optional :param header: whether or not the header should be returned, defaults to False :type header: bool, optional :param fname: whether or not the filename should be returned, defaults to False :type fname: bool, optional :raises ValueError: This error is raised if there is no light frame with the given target registered. Try 'update_raw()' if you think there should be one :raises ValueError: if both (data and header) are set to False. If you want only the filenames address the attribute directly. :yield: tuple of the desired outputs in the order (data, header, filename) :rtype: Tuple """ table = self.reduced_files if reduced else self.raw_files if target not in self.targets: raise ValueError(f"There is no light frame for the given target: {target}") light_files = self.get_files(reduced=reduced, type='LIGHT', object=target) return self.file_data(light_files, data, header, fname, return_kwds, **filter_kwds)