"""This module provides tools for data reduction"""
import os
from typing import Callable
import logging
import warnings
import numpy as np
from astropy.stats import sigma_clip
from .io import DataCollection
logger = logging.getLogger(__name__)
[docs]
class Reducer():
"""Class for general reduction and correction of the provided data
"""
def __init__(self, data:DataCollection):
self.data:DataCollection = data
self.master_bias:np.ndarray|None = None
self.master_darks:dict[int,np.ndarray|None] = {exp:None for exp in self.data.dark_exposures}
self.master_flats:dict[str|None,np.ndarray|None] = {filt:None for filt in self.data.used_filters}
self.master_lights:dict[str,np.ndarray|None] = {tar:None for tar in self.data.targets}
logger.info("Finished reducer initialisation.")
[docs]
@staticmethod
def combine(data:list[np.ndarray], method:str='mean', sigmaclip:bool=True, sigma:int=5)->np.ndarray:
"""combines the given data to one 2D-array
:param data: list of the image arrays
:type data: list[np.ndarray]
:param method: which method to use for the combination,
valid are 'mean', 'median' and 'add',
defaults to 'mean'
:type method: str, optional
:param sigmaclip: whether or not the data should be clipped or not, defaults to True
:type sigmaclip: bool, optional
:param sigma: standard deviation used for sigma clipping, defaults to 5
:type sigma: int, optional
:return: returns the combined data array
:rtype: np.ndarray
"""
logger.debug("Started combination with parameters:\n"+
"\tnumber of images: %d\n"+
"\tmethod: %s\n"+
"\tsigmaclip: %s\n"+
"\tsigma: %s",
len(data), method, sigmaclip, sigma)
if len(data) == 1:
warnings.warn("WARNING: only one file in the list!")
logger.warning('only one file in the list --> no combination possible')
return data[0]
stack = np.stack(data)
if sigmaclip:
stack = sigma_clip(stack, sigma=sigma, axis=0)
match method:
case 'mean':
return np.array(np.mean(stack, axis=0)) # type: ignore
case 'median':
return np.array(np.median(stack, axis=0)) # type: ignore
case 'add':
return np.array(np.sum(stack, axis=0)) # type: ignore
case _:
raise ValueError("You provided an invalid argument for the combination method.")
[docs]
@staticmethod
def generate_filename(frame:str, obj:str|None=None,
filt:str|None=None, exposure:str|None=None,
suffix:str|None=None)->str:
"""generates a suitable filename for a masterframe based on the given values.
The name follows the following pattern: 'master_<frame>_<obj>_<exposure>s.fits'
:param frame: name of the frame, for example 'bias', can be anything
:type frame: str
:param obj: name of the object in the frame, defaults to None
:type obj: str, optional
:param filt: the used filter in the frame, defaults to None
:type filt: str, optional
:param exposure: duration of the exposure, defaults to None
:type exposure: str, optional
:param suffix: additional suffix that should be added, defaults to None
:type suffix: str, optional
:return: the name of the file in the format 'master_<frame>_<obj>_<exposure>s.fits'
:rtype: str
"""
# generate the right filename depending on what arguments are given
file_name: str = f"master_{frame}"
if obj is not None:
file_name = f"{file_name}_{obj}"
if filt is not None:
file_name = f"{file_name}_filter_{filt}"
if exposure is not None:
file_name = f"{file_name}_{exposure}s"
if suffix is not None:
file_name = f"{file_name}_{suffix}s"
file_name = f"{file_name}.fits".replace(" ", "_")
return file_name
[docs]
def create_master_bias(self, force_new:bool=False, suffix:str|None=None, **kwargs)->np.ndarray:
"""Creates a master bias by combining the registered files if it
does not exist yet and saves it.
:param force_new: whether or not a new master file should be forced, defaults to False
:type force_new: bool, optional
:param suffix: suffix to add to the file, defaults to None
:type suffix: str, optional
:param kwargs: additional keyword arguments passed to 'combine'
:return: the combined data
:rtype: np.ndarray
"""
logger.info("Started creation of master bias.")
# check if master bias exists
if self.master_bias is not None and not force_new:
# there is no master created yet and don't force a new one
logger.debug("Master bias already loaded.")
return self.master_bias
if force_new or self.data.get_master('bias') is None:
# collect data
biases, fnames = zip(*self.data.bias(fname=True))
biases = list(biases)
_, header = self.data.hdu_from_file(self.data.bias_files[0])
# update header
# TODO: more detailed header update
header['COMBINED'] = True
header['NCOMBINE'] = len(biases)
# stack the frames and save
master = self.combine(biases, **kwargs)
file_name = self.generate_filename('bias', suffix=suffix)
self.data.save_file(self.data.reduced_path/file_name, master, header)
logger.debug("Combined %s frames to one master bias.\n"+
"Master filename: \t%s\n"+
"The used frames are:\n\t%s", len(fnames), file_name, '\n\t'.join(fnames))
# update the masters
self.data.add_file(self.data.reduced_path/file_name, reduced=True)
else:
logger.info("Loaded master bias from file: %s",
str(self.data.reduced_path/self.data.get_files(True, combined=True, type='BIAS')))
master = self.data.get_master('bias', header=False)
self.master_bias = master
logger.info("Finished master bias creation.")
return self.master_bias
[docs]
def create_master_dark(self, exposure:int=-1, force_new:bool=False,
suffix:str|None=None, **kwargs)->np.ndarray|None:
"""Creates master darks by combining the registered files if they
do not exist yet and saves them.
:param exposure: the exposure time for which the master dark should be created
a time of -1 means that for every exposure registered the function is
called recursevly, defaults to -1
:type exposure: int, optional
:param force_new: whether or not a new master file should be forced, defaults to False
:type force_new: bool, optional
:param kwargs: additional keyword arguments passed to 'combine'
:return: the combined data or None if the exposure time is set to -1
:rtype: np.ndarray | None
"""
logger.info("Started creation of master dark for exposure: %s", exposure)
# create a master frame for every exposure
if exposure == -1:
for exp in self.data.dark_exposures:
self.create_master_dark(exp, force_new, **kwargs)
return None
# check if master dark exists
if self.master_darks[exposure] is not None and not force_new:
logger.debug("Master dark already loaded.")
return self.master_darks[exposure]
if force_new or self.data.get_master('dark', specifier=exposure) is None:
# collect data
darks, fnames = zip(*self.data.darks(exposure, fname=True))
darks = list(darks)
_, header = self.data.hdu_from_file(self.data.dark_files[exposure][0])
# correction
if self.master_bias is None:
logger.info("There is no master bias for the correction loaded.")
self.create_master_bias(**kwargs)
darks = [d-self.master_bias for d in darks]
# update the header
# TODO: more detailed header update
header['COMBINED'] = True
header['NCOMBINE'] = len(darks)
# stack the frames and save
master = self.combine(darks, **kwargs)
file_name = self.generate_filename('dark', exposure=str(exposure), suffix=suffix)
self.data.save_file(self.data.reduced_path/file_name, master, header)
logger.debug("Combined %s frames to one master dark.\n"+
"Master filename: \t%s\n"+
"Exposure: %s\n"+
"The used frames are:\n\t%s", len(fnames), file_name, exposure, '\n\t'.join(fnames))
# update the masters
self.data.add_file(self.data.reduced_path/file_name, reduced=True)
self.master_darks[exposure] = master
else:
logger.info("Loaded master dark from file: %s",
str(self.data.reduced_path/self.data.get_files(True, type='DARK',
combined=True, exposure=exposure)))
path = self.data.reduced_path / self.data.get_files(True, type='DARK', combined=True, exposure=exposure)
data, _ = self.data.hdu_from_file(path)
self.master_darks[exposure] = data
return self.master_darks[exposure]
def __scale_dark(self, target_header, **kwargs)->np.ndarray:
"""reads the header, finds the best master darks and scales it to match the exposure time
:param target_header: header of the target image
:type target_header: fits.io.header.Header
:return: scaled master dark
:rtype: np.ndarray
"""
target_time = int(target_header.get('EXPOSURE'))
exposures = self.data.dark_exposures
idx = np.searchsorted(sorted(exposures), target_time, side='left')
best_time = exposures[idx] if idx<len(exposures) else exposures[-1]
mdark = self.master_darks[best_time]
mdark = mdark if mdark is not None else self.create_master_dark(best_time, **kwargs)
mdark = self.master_darks[best_time] * target_time / best_time
return mdark
[docs]
def create_master_flats(self, used_filter:str='all', norm:Callable|None=None,
force_new:bool=False, suffix:str|None=None, **kwargs)->np.ndarray|None:
"""Creates master flats by combining the registered files if they
do not exist yet and saves them.
:param used_filter: the filter for which the master flat should be created
a value of 'all' means that for every filter registered, the function is
called recursevly, defaults to 'all'
:type used_filter: str, optional
:param norm: function used to normalize each frame,
should take a 2d array as input and return the normalized array,
defaults to None
:type norm: Callable | None, optional
:param force_new: whether or not a new master file should be forced, defaults to False
:type force_new: bool, optional
:param kwargs: additional keyword arguments passed to 'combine'
:return: the combined data or None if the used_filter is set to 'all'
:rtype: np.ndarray | None
"""
logger.info("Started creation of master flat for filter: %s", used_filter)
# ensure that the norm is defined
norm = inv_median if norm is None else norm
# execute the function for every filter
if used_filter == 'all':
for filt in self.data.used_filters:
self.create_master_flats(filt, norm, force_new, **kwargs)
return None
# check if a master flat is loaded
if self.master_flats[used_filter] is not None and not force_new:
logger.debug("Master flat already loaded.")
return self.master_flats[used_filter]
# check if a flat exists
if not force_new and self.data.get_master('flat', specifier=used_filter) is not None:
logger.info("Loaded master flat from file: %s",
str(self.data.reduced_path/self.data.get_files(True, type='FLAT',
combined=True, filter=used_filter)))
path = self.data.reduced_path / self.data.get_files(True, type='FLAT',
combined=True, filter=used_filter)
data, _ = self.data.hdu_from_file(path)
self.master_flats[used_filter] = data
else:
# collect data
flats, fnames = zip(*self.data.flats(used_filter, fname=True))
flats = list(flats)
_, header = self.data.hdu_from_file(self.data.flat_files[used_filter][0])
# correction
if self.master_bias is None:
logger.info("There is no master bias for the correction loaded.")
mbias = self.create_master_bias(**kwargs)
else:
mbias = self.master_bias
# find best dark and scale
mdark = self.__scale_dark(header, **kwargs)
flats = [f-mbias-mdark for f in flats]
# update header
# TODO: more detailed header update
header['COMBINED'] = True
header['NCOMBINE'] = len(flats)
# stack the frames and save
master = self.combine(flats, **kwargs)
if norm is not None:
master = norm(master)
logger.info("The flat frame is normalized.")
file_name = self.generate_filename('flat', filt=used_filter, suffix=suffix)
self.data.save_file(self.data.reduced_path/file_name, master, header)
logger.debug("Combined %s frames to one master flat.\n"+
"Master filename: \t%s\n"+
"Filter: %s\n"+
"The used frames are:\n\t%s", len(fnames), file_name, used_filter, '\n\t'.join(fnames))
# update the masters
self.data.add_file(self.data.reduced_path/file_name, reduced=True)
self.master_flats[used_filter] = master
return self.master_flats[used_filter]
[docs]
def reduce_lights(self, target:str='all', force_new:bool=False,
**kwargs)->None:
"""creates reduced light frames and saves them individually for later stacking/analysis
:param target: the object for which the frames should be calibrated,
a value of 'all' means that the functioin is called recursevly for ever registered target,
defaults to 'all'
:type target: str, optional
:param force_new: wherther or not the frames should be overwritten if they exist,
defaults to False
:type force_new: bool, optional
:raises RuntimeError: raised if force_new=False and a file with the same name exists
in the reduced data directory
:rtype: None
"""
logger.info("Started reduction of light frames for %s", target)
if target == 'all':
for tar in self.data.light_meta:
self.reduce_lights(tar, force_new, **kwargs)
return None
# TODO: implement an update option
self.data.scan(raw=False, reduced=True)
if not force_new:
# check if reduced lights exist
for file in self.data.get_files(type='LIGHT', object=target):
# check if any reduced files exist
if len(self.data.get_files(type='LIGHT', object=target, reduced=True))==0:
break
if file in self.data.get_files(type='LIGHT', object=target, reduced=True):
logger.error("The file '%s' is already registered in the reduced data directory.", file)
raise RuntimeError(f"The file '{file}' is already registered in the reduced data direcory."\
"Use force_new=True if you want to verwrite")
# if this point is reached no file conflicts with the reduced data
# collect data
meta = self.data.light_meta[target]
for filt, expo in meta:
lights, hdrs, fnames = zip(*self.data.lights(target, header=True, fname=True, filter=filt, exposure=expo))
lights = list(lights)
_, header = self.data.hdu_from_file(fnames[0])
# correction
if self.master_bias is None:
logger.info("There is no master bias for the correction loaded.")
mbias = self.create_master_bias(**kwargs)
else:
mbias = self.master_bias
# find best dark and scale
mdark = self.__scale_dark(header)
used_filter = header.get('FILTER')
# check filter and get the corresponding flat
used_filter = str(used_filter) if used_filter is not None else None
if self.master_flats[used_filter] is None:
mflat = self.create_master_flats(**kwargs)
else:
mflat = self.master_flats[used_filter]
# apply the correction frames
lights = [(l-mbias-mdark)/mflat for l in lights]
for data, hdr, fname in zip(lights, hdrs, fnames):
# TODO: add header update
# remove the path from the filename
fname = os.path.basename(fname)
self.data.save_file(self.data.reduced_path/fname, data, hdr)
self.data.scan(raw=False, reduced=True)
logger.info("Finished correction of light frames for %s.", target)
[docs]
def stack_lights(self, target:str='all', alignment:Callable|None=None,
**kwargs)->None:
"""stacking the light frames of a given target. It will automatically loop over every filter.
:param target: the object in the image,
a value of 'all' means that this function is called recursevely
for every registered target, defaults to 'all'
:type target: str, optional
:param alignment: function to align the images,
should have the signature alignment(np.ndarray, np.ndarray) where the
first argument is the target and the second one is the source.
The source will be aligned to match the target, defaults to None
:type alignment: Callable | None, optional
"""
if target == 'all':
for tar in self.data.light_meta:
self.stack_lights(tar, alignment, **kwargs)
return None
logger.info("Started stacking of light frames for %s", target)
# collect data
for filt, expo in self.data.light_meta[target]:
lights, fnames = zip(*self.data.lights(target, fname=True, reduced=True,
filter=filt, exposure=expo, combined=None))
_, header = self.data.hdu_from_file(self.data.reduced_path/fnames[0])
# update header
# TODO: more detailed header update
header['COMBINED'] = True
header['NCOMBINE'] = len(lights)
# stack the frames
if alignment is not None:
lights = [alignment(lights[0], l) for l in lights]
logger.info("The light frames are alighned.")
master = self.combine(list(lights), **kwargs)
file_name = self.generate_filename('light', target, filt, str(expo))
self.data.save_file(self.data.reduced_path/file_name, master, header)
logger.debug("Combined %s frames to one master dark.\n"+
"Master filename: \t%s\n"+
"Filter: %s\n"+
"Exposure: %s\n"+
"The used frames are:\n\t%s", len(fnames), file_name, filt, expo, '\n\t'.join(fnames))
# update the masters
# self.data.master_light_files[target].append(file_name)
# self.master_lights[target].append(file_name)