Source code for hipecta.image.telescope_reco

import numpy as np
import ctapipe
import hipecta.core
import astropy.units as u
from ctapipe.io.containers import HillasParametersContainer
from astropy.coordinates import Angle
from ctapipe.core import Component
from traitlets import Int
from copy import copy

'''
TO:
-1 Set a config by telescope_type
-Fill ctapipe event container r1, and dl0
'''


class TelescopeInfo:
    def __init__(self, reco_temporary):
        """
        Attributes
        ----------
        reco_temporary: hipecta.core.PRecoTemporary
        Parameters
        ----------
        reco_temporary: hipecta.core.PRecoTemporary containing:
            focalLength, matNeighbourQuadSum, nbSliceBeforPeak, matCalibratedSignal,
            matNeighbourSlice, nbSliceWindow, matKeepSignalQuad, matSignal,
            newMatCalibratedSignal, matNeighbourPixelSum, matSignalQuad,
            newMatKeepSignalQuad
        """
        self.reco_temporary = reco_temporary


[docs]class TelescopeReco(Component): """ Process a raw telescope waveform (R1) to DL1 (Hillas parameters) """ wavelet_threshold = Int(default_value=3, help="wavelet cleaning threshold").tag(config=True) hillas_threshold_signal_tel = Int(default_value=500, help="hillas threshold").tag(config=True) hillas_threshold_center = Int(default_value=3, help="hillas threshold center").tag(config=True) hillas_threshold_neighbour = Int(default_value=2, help="hillas threshold neighbour").tag(config=True) def __init__(self, config=None, tool=None, **kwargs): """ handle the r0 to dl1 reconstruction. Calibration/Integration/Cleaning/Hillas. Attributes ---------- telescope_info : dictionary Python dict. key is telescope id. Value is an instance of Telscope_info Parameters ---------- config : traitlets.loader.Config Configuration specified by config file or cmdline arguments. Used to set traitlet values. Set to None if no configuration to pass. tool : ctapipe.core.Tool or None Tool executable that is calling this component. Passes the correct logger to the component. Set to None if no Tool to pass. waveletThreshold : int Wavelet cleaning threshold. hillasThresholdSignalTel: int The hillas threshold kwargs """ super().__init__(config=config, parent=tool, **kwargs) self.telescope_info = dict() self.cut_config = hipecta.core.PConfigCut() self.cut_config.waveletThreshold = self.wavelet_threshold self.cut_config.hillasThresholdSignalTel = self.hillas_threshold_signal_tel self.cut_config.hillasThresholdCenter = self.hillas_threshold_center self.cut_config.hillasThresholdNeigbours = self.hillas_threshold_neighbour def _add_mc_telescope(self, telescope_id, number_samples, telescope_description: ctapipe.instrument.TelescopeDescription, camera: ctapipe.io.containers.MCCameraEventContainer): """ Add a telescope information and create gain, pedestal, ref_shape and pixel pos arrays Parameters ---------- telescope_id: int number_samples: int telescope_description: `ctapipe.instrument.telescope.TelescopeDescription` camera: ctapipe.io.containers.MCCameraEventContainer Returns ------- """ pixels_position = np.ascontiguousarray( np.transpose([telescope_description.camera.pix_x.to(u.m).value, telescope_description.camera.pix_y.to(u.m).value]).astype(np.float32)) gain_high = camera.dc_to_pe[0] try: gain_low = camera.dc_to_pe[1] except IndexError: gain_low = camera.dc_to_pe[0] pedestal_high = camera.pedestal[0] try: pedestal_low = camera.pedestal[1] except IndexError: pedestal_low = camera.pedestal[0] reference_shape = camera.reference_pulse_shape[0].astype(np.float32) reco_temporary = hipecta.core.createRecoTemporary(number_samples, 0, pixels_position, gain_high, gain_low, pedestal_high, pedestal_low, ) hipecta.core.updateRecoTemporaryWithRefShape(reco_temporary, reference_shape, 0.1) self.telescope_info[telescope_id] = TelescopeInfo(reco_temporary)
[docs] def add_real_telescope(self, telescope_id, number_samples, dc_to_pe, pedestal, reference_pulse_shape, telescope_description: ctapipe.instrument.TelescopeDescription): """ Add a telescope information and create gain, pedestal, ref_shape and pixel pos arrays Parameters ---------- telescope_id: int number_samples: int dc_to_pe: numpy ndarray shape:(nb_gain, nb_pixel) pedestal: numpy ndarray shape:(nb_gain, nb_pixel) reference_pulse_shape: numpy ndarray shape:(nb_gain, nb_pixel) telescope_description: `ctapipe.instrument.telescope.TelescopeDescription` Returns ------- """ pixels_position = np.ascontiguousarray( np.transpose([telescope_description.camera.pix_x.to(u.m).value, telescope_description.camera.pix_y.to(u.m).value]).astype(np.float32)) gain_high = dc_to_pe[0] try: gain_low = dc_to_pe[1] except IndexError: gain_low = dc_to_pe[0] pedestal_high = pedestal[0] try: pedestal_low = pedestal[1] except IndexError: pedestal_low = pedestal[0] reference_shape = reference_pulse_shape[0].astype(np.float32) # hipecta.core.createRecoTemporary will divide pedestal value by number of sample reco_temporary = hipecta.core.createRecoTemporary(number_samples, 0, pixels_position, gain_high, gain_low, pedestal_high, pedestal_low, ) hipecta.core.updateRecoTemporaryWithRefShape(reco_temporary, reference_shape, 0.1) self.telescope_info[telescope_id] = TelescopeInfo(reco_temporary)
def _process(self, telescope_id, waveform): """ Process a waveform and return its Hillas parameters Parameters ---------- waveform: `numpy.ndarray` of shape (number_slice, Returns ------- Hillas parameters """ reco_temporary = self.telescope_info[telescope_id].reco_temporary hillas_parameters, event_reco = hipecta.core.fullAnalysis(waveform, self.cut_config, reco_temporary) return hillas_parameters, event_reco
[docs] def process(self, telescope_id, event, fill_container=True): """ Process a ctapipe r0 event and return its Hillas parameters Parameters ---------- telescope_id: int event: ctapipe.io.containers.DataContainer fill_container: bool Returns ------- Hillas parameters """ HI_GAIN = 0 if telescope_id not in self.telescope_info: self._add_mc_telescope(telescope_id, event.r0.tel[telescope_id].num_samples, event.inst.subarray.tel[telescope_id], event.mc.tel[telescope_id]) waveform = event.r0.tel[telescope_id].waveform matslice = np.ascontiguousarray(waveform[0].T) hillas_parameters, event_reco = self._process(telescope_id, matslice) if fill_container: reco = self.telescope_info[telescope_id].reco_temporary # Store into event container event.dl0.tel[telescope_id].waveform = waveform# no reduction apply event.dl1.tel[telescope_id].image = [copy(reco.tabCalibSignal)] event.dl1.tel[telescope_id].extracted_samples = None event.dl1.tel[telescope_id].peakpos = copy(reco.tabPosMax) event.dl1.tel[telescope_id].cleaned = [reco.tabKeepSignalHex * reco.tabCalibSignal] return hillas_parameters, event_reco
[docs]def get_hillas_parameters_container(hillas_param): """ convert tuple of hillas parameter (from HiPeCTA to ctapipe HillasParametersContainer Parameters ---------- hillas_param: tuples containaing hillas paramters Returns ------- ctapipe HillasParametersContainer """ return HillasParametersContainer(intensity=hillas_param[hipecta.core.getHillasImageAmplitude()], x=hillas_param[hipecta.core.getHillasGx()] * u.m, y=hillas_param[hipecta.core.getHillasGy()] * u.m, length=np.sqrt(2) * hillas_param[hipecta.core.getHillasLength()] * u.m, width=np.sqrt(2) * hillas_param[hipecta.core.getHillasWidth()] * u.m, r=np.sqrt(hillas_param[hipecta.core.getHillasGx()] ** 2 + hillas_param[hipecta.core.getHillasGy()] ** 2) * u.m, phi=Angle(hillas_param[hipecta.core.getHillasPhi()] * u.rad), psi=Angle( (hillas_param[hipecta.core.getHillasDirection()]) * u.rad), skewness=hillas_param[hipecta.core.getHillassSewness()], kurtosis=hillas_param[hipecta.core.getHillasKurtosis()] )