Source code for sbm.pipelines

# -*- encoding: utf-8 -*-

import litebird_sim as lbs
import numpy as np
from multiprocessing import Pool
import healpy as hp
from tqdm import tqdm
import os
from typing import List, Union
import fcntl
import random
from astropy import constants as const
from .scan_fields import ScanFields, DB_ROOT_PATH, channel_list
from .signal_fields import SignalFields
import pysm3
import pysm3.units as u


GREEN = "\033[92m"
RESET = "\033[0m"


[docs] class Configlation: """Configuration class for the simulation. Args: imo (`str`): imo instance given by the litebird_sim channel (`str`): The name of the channel Attributes: lbs_base_path (`str`): The base path of the litebird_sim imo_version (`str`): The version of the imo nside (`int`): The nside of the healpix map mdim (`int`): The dimension to perform the map-making parallel (`bool`): If `True`, the simulation is performed in thread parallel xlink_threshold (`float`): The threshold of the cross-linking. The pixel with the value less than this threshold is ignored when the map-making is performed. only_iqu (`bool`): If `True`, only P, Q, and U are returned after the map-making """
[docs] def __init__(self, imo, channel): self.imo = imo self.channel = channel self.lbs_base_path = None self.imo_version = "v2" self.nside = 128 self.mdim = 3 self.parallel = True self.xlink_threshold = 0.7 self.only_iqu = True self.use_hwp = None self.spin_n_basis = [] self.spin_m_basis = []
class BandpassParams: def __init__( self, detectors: List[str], gamma_T_list: Union[list, None] = None, gamma_B_list: Union[list, None] = None, ): self.detectors = detectors self.gamma_T_list = gamma_T_list self.gamma_B_list = gamma_B_list
[docs] class Systematics: """Systematics class for the simulation Attributes: sigma_gain_T (`float`): The standard deviation of the gain for the top detectors sigma_gain_B (`float`): The standard deviation of the gain for the bottom detectors sigma_rho_T (`float`): The standard deviation of the pointing for the top detectors sigma_rho_B (`float`): The standard deviation of the pointing for the bottom detectors sigma_chi_T (`float`): The standard deviation of the polarization angle for the top detectors sigma_chi_B (`float`): The standard deviation of the polarization angle for the bottom detectors syst_seed (`int`): The seed for the random number generator for the systematics noise_seed (`int`): The seed for the random number generator for the noise """
[docs] def __init__(self): self.sigma_gain_T = None self.sigma_gain_B = None self.sigma_rho_T = None self.sigma_rho_B = None self.sigma_chi_T = None self.sigma_chi_B = None self.syst_seed = None self.noise_seed = None self.start_seed = None self.end_seed = None self.bpm = None
[docs] def set_bandpass_mismatch( self, detectors: List[str], gamma_T_list: Union[list, None] = None, gamma_B_list: Union[list, None] = None, ): self.bpm = BandpassParams(detectors, gamma_T_list, gamma_B_list)
def process_gain(args): ( i, filename, dirpath, gain_T, gain_B, temp_map, pol_map, mdim, only_iqu, xlink_threshold, ) = args sf = ScanFields.load_det(filename, dirpath) sf.xlink_threshold = xlink_threshold sf.use_hwp = False diff_gain_signal = SignalFields.diff_gain_field( sf, mdim, gain_T[i], gain_B[i], temp_map, pol_map ) output = sf.map_make(diff_gain_signal, only_iqu) result = { "hitmap": sf.hitmap, "map": output, "xlink2": np.abs(sf.get_xlink(2, 0)), } return result def process_pointing(args): ( i, filename, dirpath, rho_T, rho_B, chi_T, chi_B, pol_map, eth_I, eth_P, o_eth_P, mdim, only_iqu, xlink_threshold, ) = args sf = ScanFields.load_det(filename, dirpath) sf.xlink_threshold = xlink_threshold sf.use_hwp = False diff_signal = SignalFields.diff_pointing_field( sf, mdim, rho_T[i], rho_B[i], chi_T[i], chi_B[i], pol_map, eth_I, eth_P, o_eth_P ) output = sf.map_make(diff_signal, only_iqu) result = { "hitmap": sf.hitmap, "map": output, "xlink2": np.abs(sf.get_xlink(2, 0)), } return result
[docs] def generate_maps(mbs, config, lock=True): """Generate the maps with the lock file Args: mbs (`lbs.MbsParameters`): The litebird_sim object config (:class:`.Configlation`): The configuration class lock (`bool`): If `True`, the lock file is used """ if lock: lockfile = "/tmp/sbm_lockfile" with open(lockfile, "a") as f: try: fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) except BlockingIOError: print("Another instance is running") fcntl.flock(f.fileno(), fcntl.LOCK_EX) try: map_info = mbs.run_all() fiducial_map = map_info[0][config.channel] input_maps = { "cmb": mbs.generate_cmb()[0], "fg": mbs.generate_fg()[0], } finally: fcntl.flock(f.fileno(), fcntl.LOCK_UN) os.remove(lockfile) else: map_info = mbs.run_all() fiducial_map = map_info[0][config.channel] input_maps = { "cmb": mbs.generate_cmb()[0], "fg": mbs.generate_fg()[0], } return fiducial_map, input_maps
[docs] def sim_diff_gain_per_ch( config: Configlation, syst: Systematics, mbsparams: lbs.MbsParameters, ): """Simulate the differential gain systematics for each channel The map-making is performed for each detector in the channel Args: config (:class:`.Configlation`): The configuration class syst (:class:`.Systematics`): The systematics class mbsparams (`lbs.MbsParameters`): The parameters for the litebird_sim Returns: observed_map (`np.ndarray`): The observed map after the map-making input_maps (`dict`): The input maps for the simulation """ npix = hp.nside2npix(config.nside) telescope = config.channel[0] + "FT" sim = lbs.Simulation(base_path=config.lbs_base_path, random_seed=None) sim.set_instrument( lbs.InstrumentInfo.from_imo( config.imo, f"/releases/{config.imo_version}/satellite/{telescope}/instrument_info", ) ) ch_info = lbs.FreqChannelInfo.from_imo( url="/releases/" + config.imo_version + "/satellite/" + telescope + "/" + config.channel + "/channel_info", imo=config.imo, ) mbs = lbs.Mbs(simulation=sim, parameters=mbsparams, channel_list=ch_info) fiducial_map, input_maps = generate_maps(mbs, config, lock=False) temp_map = fiducial_map[0] pol_map = fiducial_map[1] + 1j * fiducial_map[2] dirpath = os.path.join(DB_ROOT_PATH, config.channel) filenames = os.listdir(dirpath) filenames = [os.path.splitext(filename)[0] for filename in filenames] assert syst.sigma_gain_T is not None assert syst.sigma_gain_B is not None if syst.syst_seed is not None: channel_id = np.where(np.array(channel_list) == config.channel)[0][0] np.random.seed(syst.syst_seed + channel_id) gain_T = np.random.normal(loc=0.0, scale=syst.sigma_gain_T, size=len(filenames)) gain_B = np.random.normal(loc=0.0, scale=syst.sigma_gain_B, size=len(filenames)) else: gain_T = np.random.normal(loc=0.0, scale=syst.sigma_gain_T, size=len(filenames)) gain_B = np.random.normal(loc=0.0, scale=syst.sigma_gain_B, size=len(filenames)) observed_map = np.zeros([3, npix]) sky_weight = np.zeros(npix) if config.parallel is True: file_args = [ ( i, filename, dirpath, gain_T, gain_B, temp_map, pol_map, config.mdim, config.only_iqu, config.xlink_threshold, ) for i, filename in enumerate(filenames) ] with Pool() as pool: for i, result in enumerate( tqdm( pool.imap(process_gain, file_args), total=len(file_args), desc=f"{GREEN}Processing {config.channel}{RESET}", bar_format="{l_bar}{bar:10}{r_bar}", colour="green", ) ): observed_map += result["map"] sky_weight[result["xlink2"] < config.xlink_threshold] += 1.0 else: for i, filename in enumerate( tqdm( filenames, desc=f"{GREEN}Processing {config.channel}{RESET}", bar_format="{l_bar}{bar:10}{r_bar}", colour="green", ) ): sf = ScanFields.load_det(filename, base_path=dirpath) sf.xlink_threshold = config.xlink_threshold sf.use_hwp = config.use_hwp diff_gain_signal = SignalFields.diff_gain_field( sf, config.mdim, gain_T[i], gain_B[i], temp_map, pol_map ) output = sf.map_make(diff_gain_signal, config.only_iqu) observed_map += output xlink2 = np.abs(sf.get_xlink(2, 0)) sky_weight[xlink2 < config.xlink_threshold] += 1.0 observed_map = np.array(observed_map) / sky_weight return observed_map, input_maps
[docs] def sim_diff_pointing_per_ch( config: Configlation, syst: Systematics, mbsparams: lbs.MbsParameters, ): """Simulate the differential pointing systematics for each channel The map-making is performed for each detector in the channel Args: config (:class:`.Configlation`): The configuration class syst (:class:`.Systematics`): The systematics class mbsparams (`lbs.MbsParameters`): The parameters for the litebird_sim Returns: observed_map (`np.ndarray`): The observed map after the map-making input_maps (`dict`): The input maps for the simulation """ npix = hp.nside2npix(config.nside) telescope = config.channel[0] + "FT" sim = lbs.Simulation(base_path=config.lbs_base_path, random_seed=None) sim.set_instrument( lbs.InstrumentInfo.from_imo( config.imo, f"/releases/{config.imo_version}/satellite/{telescope}/instrument_info", ) ) ch_info = lbs.FreqChannelInfo.from_imo( url="/releases/" + config.imo_version + "/satellite/" + telescope + "/" + config.channel + "/channel_info", imo=config.imo, ) mbs = lbs.Mbs(simulation=sim, parameters=mbsparams, channel_list=ch_info) fiducial_map, input_maps = generate_maps(mbs, config, lock=False) pol_map = fiducial_map[1] + 1j * fiducial_map[2] dirpath = os.path.join(DB_ROOT_PATH, config.channel) filenames = os.listdir(dirpath) filenames = [os.path.splitext(filename)[0] for filename in filenames] assert syst.sigma_rho_T is not None assert syst.sigma_rho_B is not None assert syst.sigma_chi_T is not None assert syst.sigma_chi_B is not None if syst.syst_seed is not None: channel_id = np.where(np.array(channel_list) == config.channel)[0][0] np.random.seed(syst.syst_seed + channel_id) rho_T = np.random.normal(loc=0.0, scale=syst.sigma_rho_T, size=len(filenames)) rho_B = np.random.normal(loc=0.0, scale=syst.sigma_rho_B, size=len(filenames)) chi_T = np.random.normal(loc=0.0, scale=syst.sigma_chi_T, size=len(filenames)) chi_B = np.random.normal(loc=0.0, scale=syst.sigma_chi_B, size=len(filenames)) else: rho_T = np.random.normal(loc=0.0, scale=syst.sigma_rho_T, size=len(filenames)) rho_B = np.random.normal(loc=0.0, scale=syst.sigma_rho_B, size=len(filenames)) chi_T = np.random.normal(loc=0.0, scale=syst.sigma_chi_T, size=len(filenames)) chi_B = np.random.normal(loc=0.0, scale=syst.sigma_chi_B, size=len(filenames)) dI = hp.alm2map_der1(hp.map2alm(fiducial_map[0]), nside=config.nside) dQ = hp.alm2map_der1(hp.map2alm(fiducial_map[1]), nside=config.nside) dU = hp.alm2map_der1(hp.map2alm(fiducial_map[2]), nside=config.nside) eth_I = dI[2] - 1j * dI[1] eth_P = dQ[2] + dU[1] - 1j * (dQ[1] - dU[2]) o_eth_P = dQ[2] - dU[1] + 1j * (dQ[1] + dU[2]) observed_maps = np.zeros([3, npix]) sky_weight = np.zeros(npix) if config.parallel is True: file_args = [ ( i, filename, dirpath, rho_T, rho_B, chi_T, chi_B, pol_map, eth_I, eth_P, o_eth_P, config.mdim, config.only_iqu, config.xlink_threshold, ) for i, filename in enumerate(filenames) ] with Pool() as pool: for i, result in enumerate( tqdm( pool.imap(process_pointing, file_args), total=len(file_args), desc=f"{GREEN}Processing {config.channel}{RESET}", bar_format="{l_bar}{bar:10}{r_bar}", colour="green", ) ): observed_maps += result["map"] sky_weight[result["xlink2"] < config.xlink_threshold] += 1.0 else: for i, filename in enumerate( tqdm( filenames, desc=f"{GREEN}Processing {config.channel}{RESET}", bar_format="{l_bar}{bar:10}{r_bar}", colour="green", ) ): sf = ScanFields.load_det(filename, base_path=dirpath) sf.xlink_threshold = config.xlink_threshold sf.use_hwp = config.use_hwp diff_signal = SignalFields.diff_pointing_field( sf, config.mdim, rho_T[i], rho_B[i], chi_T[i], chi_B[i], pol_map, eth_I, eth_P, o_eth_P, ) output = sf.map_make(diff_signal, config.only_iqu) xlink2 = np.abs(sf.get_xlink(2, 0)) sky_weight[xlink2 < config.xlink_threshold] += 1.0 observed_maps += output observed_maps = np.array(observed_maps) / sky_weight return observed_maps, input_maps
def process_bpm(args): ( idet, dirpath, gamma_T_list, gamma_B_list, component_tmaps, pol_map, mdim, only_iqu, xlink_threshold, ) = args sf = ScanFields.load_det(idet, dirpath) sf.xlink_threshold = xlink_threshold sf.use_hwp = False signal_field = SignalFields.bandpass_mismatch_field( sf, mdim, pol_map, gamma_T_list, gamma_B_list, component_tmaps, ) output = sf.map_make(signal_field, only_iqu) result = { "hitmap": sf.hitmap, "map": output, "xlink2": np.abs(sf.get_xlink(2, 0)), } return result def dBodTth(nu): return lbs.hwp_sys.hwp_sys._dBodTth(nu) def dBRJ_dT(nu): return (2 * const.k_B.value * nu * nu * 1e18) / const.c.value / const.c.value def BlackBody(nu, T): x = const.h.value * nu * 1e9 / const.k_B.value / T ex = np.exp(x) exm1 = ex - 1.0e0 return ( 2 * const.h.value * nu * nu * nu * 1e27 / const.c.value / const.c.value / exm1 ) def sed_dust(nu, betad, Td=20.0, nud=545.0): if isinstance(Td, float) and isinstance(betad, float): sed = ((nu / nud) ** betad) * BlackBody(nu, Td) / BlackBody(nud, Td) else: sed = ( ((nu / nud) ** betad[..., np.newaxis]) * BlackBody(nu, Td[..., np.newaxis]) / BlackBody(nud, Td[..., np.newaxis]) ) return sed # synch spectral index is -3 +2 due to RJ to power conversion def sed_synch(nu, betas=-1.0, nus=0.408): if isinstance(betas, float): sed = (nu / nus) ** betas else: sed = (nu / nus) ** (betas[..., np.newaxis] + 2) return sed def sed_freefree(nu, freq_ref_I=30.0, pl_index=-2.14): # spectral index is -2.14 +2 due to RJ to power conversion return (nu / freq_ref_I) ** (pl_index + 2) def color_correction_dust(nu, nu0, betad, band, Td=20.0): return ( np.trapz(band * sed_dust(nu, betad, Td) / sed_dust(nu0, betad, Td), nu) / np.trapz(band * dBodTth(nu), nu) * dBodTth(nu0) ) # nu^(-2) throughput factor already in band definition def color_correction_synch(nu, nu0, band, betas=-1.0): return ( np.trapz(band * sed_synch(nu, betas) / sed_synch(nu0, betas), nu) / np.trapz(band * dBodTth(nu), nu) * dBodTth(nu0) ) def color_correction_freefree(nu, nu0, band): return ( np.trapz(band * sed_freefree(nu) / sed_freefree(nu0), nu) / np.trapz(band * dBodTth(nu), nu) * dBodTth(nu0) ) def color_correction_co(nu, band, lines, line_frequency, out_units): weights = band * dBRJ_dT(nu) / np.trapz(band * dBRJ_dT(nu), nu) weights /= np.trapz(weights, nu) out = 0 for line in lines: line_freq = line_frequency[line].to_value(u.GHz) # check if the line is in the bandpass if line_freq >= nu[0] and line_freq <= nu[-1]: # interpolate the value of the bandpass at the freq of the CO line weight = np.interp(line_freq, nu, weights) convert_to_uK_RJ = (1 * u.K_CMB).to_value( u.uK_RJ, equivalencies=u.cmb_equivalencies(line_freq * u.GHz), ) # sums over the co lines in the bandpass out += convert_to_uK_RJ * weight # converts to K_CMB/uK_CMB units sed = (out << u.uK_RJ) * pysm3.bandpass_unit_conversion(nu * u.GHz, band, out_units) return sed.value def co_map(nu, lines, line_frequency, nside, template): out = np.zeros((3, hp.nside2npix(nside)), dtype=np.float64) for line in lines: line_freq = line_frequency[line].to_value(u.GHz) if line_freq >= nu[0] and line_freq <= nu[-1]: print(f"CO line {line_freq} in the band") I_map = template[line].copy() # if self.include_high_galactic_latitude_clouds: # I_map += self.simulate_high_galactic_latitude_CO(line) # if self.has_polarization: # out[1:] += ( # self.simulate_polarized_emission(I_map).value # * convert_to_uK_RJ # * weight ) out[0] += I_map.value return out def init_ame( map_I, freq_ref_I, emissivity, freq_peak, freq_ref_peak, nside, max_nside=None, unit_I=None, map_dist=None, ): I_ref = pysm3.read_map(map_I, nside, unit=unit_I) I_ref <<= u.uK_RJ freq_ref_I = u.Quantity(freq_ref_I).to(u.GHz) try: # input is a number freq_peak = u.Quantity(freq_peak).to(u.GHz) except TypeError: # input is a path freq_peak = pysm3.read_map(freq_peak, nside, unit=u.GHz) freq_ref_peak = u.Quantity(freq_ref_peak).to(u.GHz) freq_peak /= freq_ref_peak emissivity = pysm3.models.template.read_txt(emissivity, unpack=True) return I_ref, freq_ref_I, freq_peak, emissivity def sed_ame(nu, band, freq_ref_I, freq_peak, emissivity, nside): freqs = pysm3.utils.check_freq_input(nu * u.GHz) # tau(nu) * dB_RJ(nu)/dT / np.trapz( tau(nu) * dB_RJ(nu)/dT, nu) weights = pysm3.utils.normalize_weights(nu, band) if len(freqs) > 1: sed = np.zeros((hp.nside2npix(nside), len(freqs))) for i, (freq, _weight) in enumerate(zip(freqs, weights)): scaled_freq = freq / freq_peak scaled_ref_freq = freq_ref_I / freq_peak sed[:, i] = ( (freq_ref_I / freq) ** 2 * np.interp(scaled_freq, emissivity[0], emissivity[1]) / np.interp(scaled_ref_freq, emissivity[0], emissivity[1]) ) sed_tot = np.trapz(sed * weights, freqs) else: scaled_freq = freqs / freq_peak scaled_ref_freq = freq_ref_I / freq_peak sed = np.zeros(hp.nside2npix(nside)) sed = ( (freq_ref_I / freqs) ** 2 * np.interp(scaled_freq, emissivity[0], emissivity[1]) / np.interp(scaled_ref_freq, emissivity[0], emissivity[1]) ) sed_tot = sed * weights return sed_tot
[docs] def sim_bandpass_mismatch( config: Configlation, syst: Systematics, mbsparams: lbs.MbsParameters, detector_list: Union[list, None] = None, base_path: Union[str, None] = None, ): """Simulate the bandpass mismatch systematics. Args: config (:class:`.Configlation`): The configuration class syst (:class:`.Systematics`): The systematics class mbsparams (`lbs.MbsParameters`): The parameters for the litebird_sim detector_list (list of `lbs.DetectorInfo`): List of DetectorInfo for each detector considered, generated with `lbs.DetectorInfo`. In each DetectorInfo, the `band_weights` are the bandpass for that detector. These bandpasses are used to compute the foreground input maps. If None, the foreground maps are computed at the central freq of the channel. base_path (`str`): The base path for the HDF5 files containing the cross-link maps Returns: observed_map (`np.ndarray`): The observed map after the map-making returned_input_map (`np.ndarray`): The input map for the simulation """ npix = hp.nside2npix(config.nside) telescope = config.channel[0] + "FT" sim = lbs.Simulation(base_path=config.lbs_base_path, random_seed=None) if config.imo_version != "IMo_vReformationPlan_Option1M": sim.set_instrument( lbs.InstrumentInfo.from_imo( config.imo, f"/releases/{config.imo_version}/satellite/{telescope}/instrument_info", ) ) ch_info = lbs.FreqChannelInfo.from_imo( url="/releases/" + config.imo_version + "/satellite/" + telescope + "/" + config.channel + "/channel_info", imo=config.imo, ) else: sim.set_instrument( lbs.InstrumentInfo.from_imo( config.imo, f"/releases/{config.imo_version}/LMHFT/instrument_info", ) ) ch_info = lbs.FreqChannelInfo.from_imo( url="/releases/" + config.imo_version + "/LMHFT/" + config.channel + "/channel_info", imo=config.imo, ) fg_models = mbsparams.fg_models # no bandpass integration to compute fg at nu0 mbsparams.bandpass_int = False mbs = lbs.Mbs(simulation=sim, parameters=mbsparams, channel_list=ch_info) map_info = mbs.run_all()[0] input_map_nu0 = map_info[config.channel] fgs = mbs.generate_fg()[0] fg_tmap_list = [fgs[fg][0][0] for fg in fg_models] if mbsparams.units == "uK_CMB": out_units = u.uK_CMB if mbsparams.units == "K_CMB": out_units = u.K_CMB if "pysm_dust_1" in fg_models: mbb_T = pysm3.read_map("pysm_2/dust_temp.fits", nside=config.nside) mbb_ind = pysm3.read_map("pysm_2/dust_beta.fits", nside=config.nside) if "pysm_synch_1" in fg_models: mbb_s = pysm3.read_map("pysm_2/synch_beta.fits", nside=config.nside) if "pysm_co_1" in fg_models: lines = ["10", "21", "32"] line_index = {"10": 0, "21": 1, "32": 2} line_frequency = { "10": 115.271 * u.GHz, "21": 230.538 * u.GHz, "32": 345.796 * u.GHz, } # has_polarization = False # include_high_galactic_latitude_clouds = False template_nside = 512 if config.nside <= 512 else 2048 planck_templatemap_filename = ( f"co/HFI_CompMap_CO-Type1_{template_nside}_R2.00_ring.fits" ) remote_data = pysm3.utils.RemoteData() map_in = pysm3.models.template.read_map( remote_data.get(planck_templatemap_filename), field=[line_index[line] for line in lines], unit=u.K_CMB, nside=template_nside, ) planck_templatemap = pysm3.models.co_lines.build_lines_dict( lines, hp.ud_grade(np.array(map_in), nside_out=config.nside) << u.K_CMB, ) if "pysm_ame_1" in fg_models: freq_ref_I_1 = "22.8 GHz" emissivity_1 = "pysm_2/emissivity.txt" freq_peak_1 = "pysm_2/ame_nu_peak_0.fits" freq_ref_peak_1 = "30 GHz" freq_ref_I_2 = "41 GHz" emissivity_2 = "pysm_2/emissivity.txt" freq_peak_2 = "33.35 GHz" freq_ref_peak_2 = "30 GHz" I_ref1, freq_ref_I1, freq_peak1, emissivity1 = init_ame( "pysm_2/ame_t_new.fits", freq_ref_I_1, emissivity_1, freq_peak_1, freq_ref_peak_1, config.nside, unit_I="uK_RJ", ) I_ref2, freq_ref_I2, freq_peak2, emissivity2 = init_ame( "pysm_2/ame2_t_new.fits", freq_ref_I_2, emissivity_2, freq_peak_2, freq_ref_peak_2, config.nside, unit_I="uK_RJ", ) if not base_path: dirpath = os.path.join(DB_ROOT_PATH, config.channel) else: dirpath = os.path.join(base_path, config.channel) # computing the gamma factors from the bandpasses if detector_list: gamma_T_dict = {} gamma_B_dict = {} assert len(detector_list) == len(syst.bpm.detectors) # now bandpass integration mbsparams.bandpass_int = True mbs_bp = lbs.Mbs( simulation=sim, parameters=mbsparams, detector_list=detector_list ) map_info_bp = mbs_bp.run_all()[0] returned_input_map = np.zeros([3, npix]) pol_map = {} for d in detector_list: if ( "pysm_dust_1" in fg_models or "pysm_synch_1" in fg_models or "pysm_co_1" in fg_models or "pysm_ame_1" in fg_models ): if d.name[-1] == "T": gamma_T_dict[d.name] = np.zeros((len(fg_models), npix)) if d.name[-1] == "B": gamma_B_dict[d.name] = np.zeros((len(fg_models), npix)) else: if d.name[-1] == "T": gamma_T_dict[d.name] = np.zeros(len(fg_models)) if d.name[-1] == "B": gamma_B_dict[d.name] = np.zeros(len(fg_models)) # index of bpm.detectors with same name as in d input_maps_d = map_info_bp[d.name] returned_input_map += map_info_bp[d.name] pol_map[d.name] = input_maps_d[1] + 1.0j * input_maps_d[2] # ind = np.where(np.isin(syst.bpm.detectors, d.name)) for ifg, fg in enumerate(fg_models): if fg == "pysm_dust_0": g = color_correction_dust( nu=d.band_freqs_ghz, nu0=d.bandcenter_ghz, betad=1.54, band=d.band_weights, ) if fg == "pysm_synch_0": g = color_correction_synch( nu=d.band_freqs_ghz, nu0=d.bandcenter_ghz, band=d.band_weights ) if fg == "pysm_dust_1": g = color_correction_dust( nu=d.band_freqs_ghz, nu0=d.bandcenter_ghz, betad=mbb_ind, band=d.band_weights, Td=mbb_T, ) if fg == "pysm_synch_1": g = color_correction_synch( nu=d.band_freqs_ghz, nu0=d.bandcenter_ghz, band=d.band_weights, betas=mbb_s, ) if fg == "pysm_freefree_1": g = color_correction_freefree( nu=d.band_freqs_ghz, nu0=d.bandcenter_ghz, band=d.band_weights ) if fg == "pysm_co_1": # setting the correct CO map, it would be otherwise 0 # this is just the template map, to be multiplied with g # to get the full map # in principle they are the same for each det, but we never know # the freq range between one det and the other could be different # and so they could include different numbers of CO lines # not really expected but still possible co_map_d = co_map( nu=d.band_freqs_ghz, lines=lines, line_frequency=line_frequency, nside=config.nside, template=planck_templatemap, )[0] g = ( color_correction_co( nu=d.band_freqs_ghz, band=d.band_weights, lines=lines, line_frequency=line_frequency, out_units=out_units, ) * co_map_d ) # we fix the I(nu0) map to 1, so that we recover the total CO map in g fg_tmap_list[ifg] = np.ones(npix) if fg == "pysm_ame_1": sed_ame1 = sed_ame( d.band_freqs_ghz, d.band_weights, freq_ref_I1.value, freq_peak1.value, emissivity1, config.nside, ) sed_ame2 = sed_ame( d.band_freqs_ghz, d.band_weights, freq_ref_I2.value, freq_peak2.value, emissivity2, config.nside, ) # the gamma factor is the whole map in this case (sed_ame_1*I_1 + sed_ame_2*I_2) g = ( sed_ame1 * I_ref1 + sed_ame2 * I_ref2 ) * pysm3.bandpass_unit_conversion( d.band_freqs_ghz * u.GHz, d.band_weights, out_units ) # we fix the I(nu0) map to 1, so that we recover the total AME map in g fg_tmap_list[ifg] = np.ones(npix) if d.name[-1] == "T" and gamma_T_dict[d.name].shape == ( len(fg_models), npix, ): if hasattr(g, "__len__"): gamma_T_dict[d.name][ifg, :] = g else: gamma_T_dict[d.name][ifg, :] = g * np.ones(npix) if d.name[-1] == "T" and gamma_T_dict[d.name].shape == ( len(fg_models), ): gamma_T_dict[d.name][ifg] = g if d.name[-1] == "B" and gamma_B_dict[d.name].shape == ( len(fg_models), npix, ): if hasattr(g, "__len__"): gamma_B_dict[d.name][ifg, :] = g else: gamma_B_dict[d.name][ifg, :] = g * np.ones(npix) if d.name[-1] == "B" and gamma_B_dict[d.name].shape == ( len(fg_models), ): gamma_B_dict[d.name][ifg] = g # using the values passed to the set_bandpass_mismatch class else: pol_map = input_map_nu0[1] + 1.0j * input_map_nu0[2] gamma_T_dict = syst.bpm.gamma_T_dict gamma_B_dict = syst.bpm.gamma_B_dict assert len(syst.bpm.detectors) // 2 == len(gamma_T_dict.keys()) assert len(syst.bpm.detectors) // 2 == len(gamma_B_dict.keys()) observed_map = np.zeros([3, npix]) sky_weight = np.zeros(npix) if config.parallel is True: file_args = [] for i, idet in enumerate(syst.bpm.detectors): if idet[-1] == "T": tname = idet bname = idet[:-1] + "B" if detector_list: # averaging over T and B pol input map pm = (pol_map[tname] + pol_map[bname]) / 2.0 else: pm = pol_map file_args.append( ( idet, dirpath, gamma_T_dict[tname], gamma_B_dict[bname], fg_tmap_list, pm, config.mdim, config.only_iqu, config.xlink_threshold, ) ) with Pool() as pool: for i, result in enumerate( tqdm( pool.imap(process_bpm, file_args), total=len(file_args), desc=f"{GREEN}Processing {config.channel}{RESET}", bar_format="{l_bar}{bar:10}{r_bar}", colour="green", ) ): observed_map += result["map"] sky_weight[result["xlink2"] < config.xlink_threshold] += 1.0 else: for i, idet in enumerate( tqdm( syst.bpm.detectors, desc=f"{GREEN}Processing {config.channel}{RESET}", bar_format="{l_bar}{bar:10}{r_bar}", colour="green", ) ): if idet[-1] == "T": tname = idet bname = idet[:-1] + "B" sf = ScanFields.load_det(idet, base_path=dirpath) sf.xlink_threshold = config.xlink_threshold sf.use_hwp = config.use_hwp if detector_list: # averaging over T and B pol input map pm = (pol_map[tname] + pol_map[bname]) / 2.0 else: pm = pol_map signal_field = SignalFields.bandpass_mismatch_field( sf, config.mdim, pm, gamma_T_dict[tname], gamma_B_dict[bname], fg_tmap_list, ) output = sf.map_make(signal_field, config.only_iqu) observed_map += output xlink2 = np.abs(sf.get_xlink(2, 0)) sky_weight[xlink2 < config.xlink_threshold] += 1.0 observed_map = np.array(observed_map) / sky_weight if not detector_list: returned_input_map = input_map_nu0 else: returned_input_map /= len(detector_list) return observed_map, returned_input_map
def generate_noise_seeds(config: Configlation, syst: Systematics, num_of_dets: int): channel_id = np.where(np.array(channel_list) == config.channel)[0][0] if syst.noise_seed is not None: _max = int(1e5) seed_range = list(range(0, int(_max))) random.seed(channel_id + syst.noise_seed) random.seed(random.randint(0, _max)) noise_seeds = random.choices(seed_range, k=num_of_dets) else: noise_seeds = [None] * num_of_dets return noise_seeds def process_noise(args): ( i, filename, dirpath, spin_n_basis, spin_m_basis, only_iqu, xlink_threshold, imo, use_hwp, noise_seed_i, ) = args sf = ScanFields.load_det(filename, dirpath) sf.xlink_threshold = xlink_threshold sf.use_hwp = use_hwp sf.generate_noise_pdf(imo, scale=2.0) output = sf.generate_noise(spin_n_basis, spin_m_basis, seed=noise_seed_i) result = { "hitmap": sf.hitmap, "map": output, "xlink2": np.abs(sf.get_xlink(2, 0)), } return result
[docs] def sim_noise_per_ch( config: Configlation, syst: Systematics, ): """Simulate the noise for each channel Args: config (:class:`.Configlation`): The configuration class syst (:class:`.Systematics`): The systematics class """ assert config.use_hwp is not None npix = hp.nside2npix(config.nside) dirpath = os.path.join(DB_ROOT_PATH, config.channel) filenames = os.listdir(dirpath) filenames = [os.path.splitext(filename)[0] for filename in filenames] noise_map = np.zeros([3, npix]) sky_weight = np.zeros(npix) noise_seeds = generate_noise_seeds(config, syst, len(filenames)) if config.parallel is True: file_args = [ ( i, filename, dirpath, config.spin_n_basis, config.spin_m_basis, config.only_iqu, config.xlink_threshold, config.imo, config.use_hwp, noise_seeds[i], ) for i, filename in enumerate(filenames) ] with Pool() as pool: for i, result in enumerate( tqdm( pool.imap(process_noise, file_args), total=len(file_args), desc=f"{GREEN}Processing {config.channel}{RESET}", bar_format="{l_bar}{bar:10}{r_bar}", colour="green", ) ): noise_map += result["map"] sky_weight[result["xlink2"] < config.xlink_threshold] += 1.0 else: for i, filename in enumerate( tqdm( filenames, desc=f"{GREEN}Processing {config.channel}{RESET}", bar_format="{l_bar}{bar:10}{r_bar}", colour="green", ) ): sf = ScanFields.load_det(filename, base_path=dirpath) sf.generate_noise_pdf(config.imo, scale=2.0) sf.use_hwp = config.use_hwp noise_map += sf.generate_noise( config.spin_n_basis, config.spin_m_basis, seed=noise_seeds[i] ) xlink2 = np.abs(sf.get_xlink(2, 0)) sky_weight[xlink2 < config.xlink_threshold] += 1.0 noise_map /= sky_weight return noise_map