Skip to content
Open
116 changes: 47 additions & 69 deletions pyobs/modules/telescope/basetelescope.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ class BaseTelescope(
__module__ = "pyobs.modules.telescope"

def __init__(
self,
fits_headers: Optional[Dict[str, Any]] = None,
min_altitude: float = 10,
wait_for_dome: Optional[str] = None,
**kwargs: Any,
self,
fits_headers: Optional[Dict[str, Any]] = None,
min_altitude: float = 10,
wait_for_dome: Optional[str] = None,
**kwargs: Any,
):
"""Initialize a new base telescope.

Expand All @@ -49,12 +49,6 @@ def __init__(
self._lock_moving = asyncio.Lock()
self._abort_move = asyncio.Event()

# celestial status
self._celestial_headers: Dict[str, Any] = {}

# add thread func
self.add_background_task(self._celestial, True)

# init mixins
WeatherAwareMixin.__init__(self, **kwargs)
MotionStatusMixin.__init__(self, **kwargs)
Expand Down Expand Up @@ -88,7 +82,6 @@ async def _move_radec(self, ra: float, dec: float, abort_event: asyncio.Event) -
Raises:
MoveError: If telescope cannot be moved.
"""
...

@timeout(1200)
async def move_radec(self, ra: float, dec: float, **kwargs: Any) -> None:
Expand Down Expand Up @@ -142,7 +135,7 @@ async def move_radec(self, ra: float, dec: float, **kwargs: Any) -> None:
await self._change_motion_status(MotionStatus.TRACKING)

# update headers now
asyncio.create_task(self._update_celestial_headers())
await asyncio.create_task(self._calc_celestial_headers())
log.info("Finished moving telescope.")

@abstractmethod
Expand All @@ -157,7 +150,6 @@ async def _move_altaz(self, alt: float, az: float, abort_event: asyncio.Event) -
Raises:
MoveError: If telescope cannot be moved.
"""
...

@timeout(1200)
async def move_altaz(self, alt: float, az: float, **kwargs: Any) -> None:
Expand Down Expand Up @@ -196,12 +188,11 @@ async def move_altaz(self, alt: float, az: float, **kwargs: Any) -> None:
# finish slewing
await self._change_motion_status(MotionStatus.POSITIONED)

# update headers now
asyncio.create_task(self._update_celestial_headers())
await asyncio.create_task(self._calc_celestial_headers()) #
log.info("Finished moving telescope.")

async def get_fits_header_before(
self, namespaces: Optional[List[str]] = None, **kwargs: Any
self, namespaces: Optional[List[str]] = None, **kwargs: Any
) -> Dict[str, Tuple[Any, str]]:
"""Returns FITS header for the current status of this module.

Expand All @@ -215,80 +206,65 @@ async def get_fits_header_before(
# define base header
hdr: Dict[str, Union[Any, Tuple[Any, str]]] = {}

# positions
await self._add_coordinate_header(hdr)
await self._add_side_information_header(hdr)

self._copy_header(self._fits_headers, hdr)

celestial_headers = await self._calc_celestial_headers()
self._copy_header(celestial_headers, hdr)

return hdr

async def _add_coordinate_header(self, header: Dict[str, Union[Any, Tuple[Any, str]]]) -> None:
try:
ra, dec = await self.get_radec()
coords_ra_dec = SkyCoord(ra=ra * u.deg, dec=dec * u.deg, frame=ICRS)
alt, az = await self.get_altaz()
coords_alt_az = SkyCoord(alt=alt * u.deg, az=az * u.deg, frame=AltAz)

except Exception as e:
log.warning("Could not fetch telescope position: %s", e)
coords_ra_dec, coords_alt_az = None, None

# set coordinate headers
if coords_ra_dec is not None:
hdr["TEL-RA"] = (float(coords_ra_dec.ra.degree), "Right ascension of telescope [degrees]")
hdr["TEL-DEC"] = (float(coords_ra_dec.dec.degree), "Declination of telescope [degrees]")
if coords_alt_az is not None:
hdr["TEL-ALT"] = (float(coords_alt_az.alt.degree), "Telescope altitude [degrees]")
hdr["TEL-AZ"] = (float(coords_alt_az.az.degree), "Telescope azimuth [degrees]")
hdr["TEL-ZD"] = (90.0 - hdr["TEL-ALT"][0], "Telescope zenith distance [degrees]")
hdr["AIRMASS"] = (float(coords_alt_az.secz.value), "Airmass of observation start")

# convert to sexagesimal
if coords_ra_dec is not None:
hdr["RA"] = (str(coords_ra_dec.ra.to_string(sep=":", unit=u.hour, pad=True)), "Right ascension of object")
hdr["DEC"] = (str(coords_ra_dec.dec.to_string(sep=":", unit=u.deg, pad=True)), "Declination of object")

# site location
if self.observer is not None:
hdr["LATITUDE"] = (float(self.observer.location.lat.degree), "Latitude of telescope [deg N]")
hdr["LONGITUD"] = (float(self.observer.location.lon.degree), "Longitude of telescope [deg E]")
hdr["HEIGHT"] = (float(self.observer.location.height.value), "Altitude of telescope [m]")

# add static fits headers
for key, value in self._fits_headers.items():
hdr[key] = tuple(value)

# add celestial headers
for key, value in self._celestial_headers.items():
hdr[key] = tuple(value)

# finish
return hdr
return

async def _celestial(self) -> None:
"""Thread for continuously calculating positions and distances to celestial objects like moon and sun."""
header["TEL-RA"] = (float(coords_ra_dec.ra.degree), "Right ascension of telescope [degrees]")
header["TEL-DEC"] = (float(coords_ra_dec.dec.degree), "Declination of telescope [degrees]")

# wait a little
await asyncio.sleep(10)
header["TEL-ALT"] = (float(coords_alt_az.alt.degree), "Telescope altitude [degrees]")
header["TEL-AZ"] = (float(coords_alt_az.az.degree), "Telescope azimuth [degrees]")
header["TEL-ZD"] = (90.0 - header["TEL-ALT"][0], "Telescope zenith distance [degrees]")
header["AIRMASS"] = (float(coords_alt_az.secz.value), "Airmass of observation start")

# run until closing
while True:
# update headers
await self._update_celestial_headers()
header["RA"] = (str(coords_ra_dec.ra.to_string(sep=":", unit=u.hour, pad=True)), "Right ascension of object")
header["DEC"] = (str(coords_ra_dec.dec.to_string(sep=":", unit=u.deg, pad=True)), "Declination of object")

# sleep a little
await asyncio.sleep(30)
async def _add_side_information_header(self, header: Dict[str, Union[Any, Tuple[Any, str]]]) -> None:
if self.observer is None:
return

async def _update_celestial_headers(self) -> None:
header["LATITUDE"] = (float(self.observer.location.lat.degree), "Latitude of telescope [deg N]")
header["LONGITUD"] = (float(self.observer.location.lon.degree), "Longitude of telescope [deg E]")
header["HEIGHT"] = (float(self.observer.location.height.value), "Altitude of telescope [m]")

@staticmethod
def _copy_header(source: Dict[str, Any], target: Dict[str, Union[Any, Tuple[Any, str]]]) -> None:
for key, value in source.items():
target[key] = tuple(value)

async def _calc_celestial_headers(self) -> Dict[str, Tuple[Optional[float], str]]:
"""Calculate positions and distances to celestial objects like moon and sun."""
# get now
now = Time.now()
alt: Optional[float]
az: Optional[float]

# no observer?
if self.observer is None:
return
return {}

# get telescope alt/az
tel_altaz: Optional[SkyCoord] = None
try:
alt, az = await self.get_altaz()
tel_altaz = SkyCoord(alt=alt * u.deg, az=az * u.deg, frame="altaz")
except:
alt, az, tel_altaz = None, None, None
pass

# get current moon and sun information
moon_altaz = self.observer.moon_altaz(now)
Expand All @@ -300,13 +276,15 @@ async def _update_celestial_headers(self) -> None:
sun_dist = tel_altaz.separation(sun_altaz) if tel_altaz is not None else None

# store it
self._celestial_headers = {
celestial_headers = {
"MOONALT": (float(moon_altaz.alt.degree), "Lunar altitude"),
"MOONFRAC": (float(moon_frac), "Fraction of the moon illuminated"),
"MOONDIST": (None if moon_dist is None else float(moon_dist.degree), "Lunar distance from target"),
"SUNALT": (float(sun_altaz.alt.degree), "Solar altitude"),
"SUNDIST": (None if sun_dist is None else float(sun_dist.degree), "Solar Distance from Target"),
}

return celestial_headers


__all__ = ["BaseTelescope"]
45 changes: 26 additions & 19 deletions pyobs/modules/telescope/dummytelescope.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ async def _move_radec(self, ra: float, dec: float, abort_event: asyncio.Event) -
"""

# start slewing
await self.__move(ra, dec, abort_event)
await self._move(ra, dec, abort_event)

async def _move_altaz(self, alt: float, az: float, abort_event: asyncio.Event) -> None:
"""Actually moves to given coordinates. Must be implemented by derived classes.
Expand All @@ -93,12 +93,12 @@ async def _move_altaz(self, alt: float, az: float, abort_event: asyncio.Event) -
coords = SkyCoord(
alt=alt * u.degree, az=az * u.degree, obstime=Time.now(), location=self.location, frame="altaz"
)
icrs = coords.icrs
icrs = coords.transform_to("icrs")

# start slewing
await self.__move(icrs.ra.degree, icrs.dec.degree, abort_event)
await self._move(icrs.ra.degree, icrs.dec.degree, abort_event)

async def __move(self, ra: float, dec: float, abort_event: asyncio.Event) -> None:
async def _move(self, ra: float, dec: float, abort_event: asyncio.Event) -> None:
"""Simulate move.

Args:
Expand Down Expand Up @@ -139,20 +139,28 @@ async def set_focus(self, focus: float, **kwargs: Any) -> None:

# acquire lock
async with LockWithAbort(self._lock_focus, self._abort_focus):
log.info("Setting focus to %.2f..." % focus)
await self._change_motion_status(MotionStatus.SLEWING, interface="IFocuser")
ifoc = self._telescope.focus * 1.0
dfoc = (focus - ifoc) / 300.0
for i in range(300):
# abort?
if self._abort_focus.is_set():
raise InterruptedError("Setting focus was interrupted.")

# move focus and sleep a little
self._telescope.focus = ifoc + i * dfoc
await asyncio.sleep(0.01)
await self._change_motion_status(MotionStatus.POSITIONED, interface="IFocuser")
self._telescope.focus = focus
await self._set_focus(focus)

async def _set_focus(self, focus: float) -> None:
log.info("Setting focus to %.2f..." % focus)
await self._change_motion_status(MotionStatus.SLEWING, interface="IFocuser")

focus_steps = 300
initial_focus = self._telescope.focus * 1.0
focus_step_size = (focus - initial_focus) / focus_steps
for i in range(focus_steps):
await self._step_focus(focus_step_size)

await self._change_motion_status(MotionStatus.POSITIONED, interface="IFocuser")
self._telescope.focus = focus

async def _step_focus(self, step: float) -> None:
if self._abort_focus.is_set():
raise InterruptedError("Setting focus was interrupted.")

# move focus and sleep a little
self._telescope.focus += step
await asyncio.sleep(0.01)

async def list_filters(self, **kwargs: Any) -> List[str]:
"""List available filters.
Expand Down Expand Up @@ -297,7 +305,6 @@ async def stop_motion(self, device: Optional[str] = None, **kwargs: Any) -> None
Args:
device: Name of device to stop, or None for all.
"""
pass

async def get_focus_offset(self, **kwargs: Any) -> float:
"""Return current focus offset.
Expand Down
Empty file.
Loading