Orbit prediction for Aeolus

Authors: Alexander Geiss

Abstract: Using orbit ground track files (GRND_TRACK) to predict Aeolus overpass locations and times

Load packages, modules and extensions

%load_ext blackcellmagic
%load_ext watermark
%watermark -i -v -p viresclient,pandas,xarray,matplotlib
Python implementation: CPython
Python version       : 3.9.7
IPython version      : 8.0.1

viresclient: 0.11.0
pandas     : 1.4.1
xarray     : 0.21.1
matplotlib : 3.5.1
from viresclient import AeolusRequest
import numpy as np
import pandas as pd
import datetime as dt
import os
from ftplib import FTP
import tarfile
import fnmatch
import math
import getpass
from base64 import b64decode
import ipywidgets as widgets
from ipyleaflet import (
    Map,
    Marker,
    LayersControl,
    MeasureControl,
)

request = AeolusRequest()

About orbit ground track files

Orbit ground track files are produced once per day to compute the AUX_MET files which contain ECMWF model variables and are necessary for processing the L2B wind product. Among other variables, these files include predicted geolocations from off-nadir (right-most swath point) and nadir (ground-track) intersections (see Fig. 1) with the Earth surface (WGS84) for one week. Geolocations are saved in steps of 3 seconds. Because the orbit repeat cycle of Aeolus is 7 days, all possible orbits are covered with one GRND_TRACK file. Since orbit ground track files are ASCII-files, they are easy to read an can be used to estimate overpass times for ground stations or whole regions in case of airborne campaigns. Since the mission requirement for the accuracy of the position of the reference orbit is +/- 25 km, one should always use the latest file if high accuracy is required. Otherwise it is sufficient to use any file within a period of the reference orbit. Once a week, so far on Thursdays between 13:30 and 14:30 UTC, so-called OCMs (Orbit Correction Maneuver) are performed, which may lead to strong deviations between the actual orbit and the prediction.

There were two different orbit periods for Aeolus, one with ANX 4.5 and one with ANX 2.0. ANX is the ascending node crossing at the equator and is specified as a longitude offset from the prime meridian at the equator in positive values toward the east. It is given for the first orbit east of the prime meridian at the equator.

ANX periods for the Aeolus mission:

Date

ANX

mission start - 17th June 2021

4.5

17th June 2021 - 21st June 2021

drift

21st June 2021 - today

2.0

Aeolus measurement geometry

aeolus_meas_geometry_encoded = b""
widgets.Image(value=b64decode(aeolus_meas_geometry_encoded))

Figure 1: Aeolus measurement geometry. Distance between right-most and left-most swath point is around 21 km. Image courtesy of ESA.

In order to compare collocated reference measurements with Aeolus measurements, the off-nadir intersection with the Earth’s surface is of interest.

Download orbit ground track files per FTP

Because the orbit files are not part of the VirES database, they must be downloaded separately via FTP connection from the ADDF server.
This option is only available for users who have the necessary access rights. Mainly these are members of Aeolus Cal/Val teams.

Defining a date for the GRND_TRACK file

# Date in dd.mm.yyyy
GRND_TRACK_FILE_DATE = "13.05.2020"

# separate in day, month, year
day = GRND_TRACK_FILE_DATE.split(".")[0]
month = GRND_TRACK_FILE_DATE.split(".")[1]
year = GRND_TRACK_FILE_DATE.split(".")[2]

Check if file is already available in user workspace

# Get user folder and set/create folder for GRND_TRACK files
user_folder = os.path.expanduser("~")
grnd_track_folder = os.path.join(user_folder, "files/GRND_TRACK")
os.makedirs(grnd_track_folder, exist_ok=True)

# Check if file for date of interest is already available
grnd_track_file_exists = False
for file in os.listdir(grnd_track_folder):
    if fnmatch.fnmatch(file, "*{}{}{}*.DBL".format(year, month, day)):
        grnd_track_file_path = os.path.join(grnd_track_folder, file)
        grnd_track_file_exists = True

Connect to Aeolus Data Dissemination FTP server and fetch GRND_TRACK file if file is not yet in user space

if not grnd_track_file_exists:
    # Set user and password from ESA SSO account
    USERNAME = os.environ["DATAU"]
    PW = os.environ["DATAP"]
    # Uncomment following two lines to provide your credentials as input
    # USERNAME = input("Please enter your SSO user name:")
    # PW = getpass.getpass("Please enter your password:")

    # Connect to the ADDF FTP server
    ftp = FTP("aeolus-ds.eo.esa.int")
    ftp.login(user=USERNAME, passwd=PW)

    # change directory
    ftp.cwd("/ADDF/Orbit_Data/{}-{}/{}/".format(year, month, day))

    # get file list of directory
    file_list = ftp.nlst()

    # check if GRND_TRACK file is available and set file name
    try:
        grnd_track_file_name = [k for k in file_list if "GRND" in k][0]
    except:
        print("Warning: No ground track file available for this date")
        assert False

    # Download ground track file to user space
    grnd_track_file_packed_path = os.path.join(grnd_track_folder, grnd_track_file_name)
    with open(grnd_track_file_packed_path, "wb") as fp:
        ftp.retrbinary("RETR " + grnd_track_file_name, fp.write)

    # close FTP connection
    ftp.quit()

    # Extract ground track file
    tar = tarfile.open(grnd_track_file_packed_path, "r:gz")
    
    # Define grnd_track_file name for DBL-File
    grnd_track_file_name_unpacked = grnd_track_file_name.split(".")[0] + ".DBL"

    # Extract only the .DBL file containing the orbit predictions
    tar.extract(grnd_track_file_name_unpacked, path=grnd_track_folder)
    tar.close()
    
    # Define grnd_track_file_path for DBL-File
    grnd_track_file_path = os.path.join(grnd_track_folder, grnd_track_file_name_unpacked)

Define necessary functions

def read_grnd_track(file):
    """
    Function to read Aeolus Ground Track files.

    Parameters
    ----------
    file : string
        Input file path.

    Returns
    -------
    data : dict
        Aeolus orbit location for nadir and off-nadir as function of time.

    """
    data = {}
    date_time = os.path.basename(file)[19:34].split("T")
    data["date"] = date_time[0]
    data["time"] = date_time[1]

    # define a function to convert date and time to datetime objects when reading the file with numpy.genfromtxt
    convertfunc = lambda x: dt.datetime.strptime(x, "%Y/%m/%d-%H:%M:%S.%f")
    data_raw = np.genfromtxt(
        file,
        skip_header=3,
        dtype=None,
        names="Date_time, Nadir_lat, Nadir_lon, Offpoint_lat, Offpoint_lon",
        converters={"Date_time": convertfunc},
        encoding='utf-8',
    )
    for i in data_raw.dtype.names:
        data[i] = data_raw[i]
    return data


def get_map(data, xlim, geo_lim, altitude_in_km):
    """
    main function to process data and create leaflet map with predicted orbit locations as marker
    """
    offset_in_km = convert_to_offset_in_km(altitude_in_km)
    lat, lon, date_time, orbit_node = apply_filter(data, xlim, geo_lim)
    lon_with_offset = get_offset(lat, lon, orbit_node, offset_in_km)

    # add aditional offset because of an offset which was found between the actual measurement and the prediction
    # This turned out to be a problem in the L1B product and could be solved now. So it was actually not an error in the orbit prediction.
    # This line is now commented out
    # lon_with_offset = add_prediction_offset(lon_with_offset, offset_in_degree=-0.07)

    # get lat and lon in degrees and minutes
    lat_dm, lon_dm, lon_with_offset_dm = get_coord_in_dm(lat, lon, lon_with_offset)

    # plot orbit locations on leaflet map
    m = plot_orbits(
        lat,
        lon,
        lon_with_offset,
        lat_dm,
        lon_dm,
        lon_with_offset_dm,
        date_time,
        offset_in_km,
    )

    return m


def export_orbits_as_csv(data, xlim, geo_lim, altitude_in_km):
    """
    Optional export of orbits as csv-file for different ground-track offset values
    corresponding to different altitudes.
    """
    offset_in_km = convert_to_offset_in_km(altitude_in_km)
    lat, lon, date_time, orbit_node = apply_filter(data, xlim, geo_lim)

    lon_with_offset_11km = get_offset(lat, lon, orbit_node, 11)
    # lon_with_offset_11km = add_prediction_offset(lon_with_offset_11km, offset_in_degree=-0.07)
    lon_with_offset_21km = get_offset(lat, lon, orbit_node, 21)
    # lon_with_offset_21km = add_prediction_offset(lon_with_offset_21km, offset_in_degree=-0.07)
    lon_with_offset_gui = get_offset(lat, lon, orbit_node, offset_in_km)
    # lon_with_offset_gui = add_prediction_offset(lon_with_offset_gui, offset_in_degree=-0.07)

    time = [i.strftime("%Y/%m/%d-%H:%M:%S") for i in date_time]

    file_name = "Orbit_prediction_{}.txt".format(date_time[0].strftime("%Y%m%d_%H%M"))

    with open(file_name, "w") as file:
        file.write(
            "# Aeolus orbit prediction from {} until {} \n".format(time[0], time[-1])
            + "Offsets for 0 km, 11 km (15 km altitude), 21 km (30 km altitude) and user defined \n"
            + "# time \t  lat \t lon_0km_offset \t lon_11km_offset \t lon_21km_offset \t lon_{:.2f}km_offset \n".format(
                offset_in_km
            )
        )

        for i in range(len(lat)):
            file.write(
                "{} \t {:.3f} \t {:.3f} \t {:.3f} \t {:.3f} \t {:.3f} \n".format(
                    time[i],
                    lat[i],
                    lon[i],
                    lon_with_offset_11km[i],
                    lon_with_offset_21km[i],
                    lon_with_offset_gui[i],
                )
            )


def convert_to_offset_in_km(altitude_in_km):
    """Convert altitude of Aeolus intersection into an longitudinal offset in km"""
    offset_in_km = round(math.tan(math.radians(35)) * altitude_in_km, 2)

    return offset_in_km


def apply_filter(data, xlim, geo_lim):
    """
    apply geolocation and time filter for Orbit Ground Track File data
    """
    # determine orbit node (ascending or descending)
    orbit_node = determine_orbit_node(data["Offpoint_lat"])

    # apply time filter and assign values to variables
    lat = data["Offpoint_lat"][xlim[0]:xlim[1]]
    lon = data["Offpoint_lon"][xlim[0]:xlim[1]]
    date_time = data["Date_time"][xlim[0]:xlim[1]]
    orbit_node = orbit_node[xlim[0]:xlim[1]]

    # create additional geolocation filter mask
    lat_lon_mask = (lat > geo_lim[0][0]) & (lat < geo_lim[0][1]) & (lon > geo_lim[1][0]) & (lon < geo_lim[1][1])

    # apply geolocation filter mask
    lat = lat[lat_lon_mask]
    lon = lon[lat_lon_mask]
    orbit_node = orbit_node[lat_lon_mask]
    date_time = date_time[lat_lon_mask]

    check_if_array_empty(lat)

    return lat, lon, date_time, orbit_node


def check_if_array_empty(array):
    if array.size == 0:
        raise ValueError('Filters are too strict. No orbits can be found within the chosen times and geolocations.')


def determine_orbit_node(lat):
    """
    determine ascending or descending node from latitude values
    1 = ascending
    -1 = descending
    """
    orbit_node = np.full(len(lat), -1)
    orbit_node[:-1][np.diff(lat) < 0] = 1

    return orbit_node


def get_offset(lat, lon, orbit_node, offset_in_km):
    """
    Determine longitude offset from offset in kilometre as function of latitude and orbit node.
    Note that this is not reliably applicable to very high latitudes.
    """
    lon_with_offset = lon + offset_in_km / (111.32 * np.cos(np.deg2rad(lat))) * orbit_node

    return lon_with_offset


def add_prediction_offset(lon_with_offset, offset_in_degree):
    """
    Due to an offset between the actual measurement and the predictet orbit which is under investigation,
    an additional offset is necessary and can be set with this function.
    This turned out to be a problem in the L1B product and could be solved now. So it was actually not an error in the orbit prediction.
    This correction is no longer needed and will not be applied in the prediction.
    """
    lon_with_offset = lon_with_offset + offset_in_degree

    return lon_with_offset


def plot_orbits(lat, lon, lon_w_offset, lat_dm, lon_dm, lon_with_offset_dm, date_time, offset_in_km):
    """
    Create leaflet map and plot orbit locations as markers
    """
    # create string representation of times
    time = [i.strftime("%a %Y/%m/%d-%H:%M:%S") for i in date_time]

    # determine center of map depending on geolocations to plot
    map_center_lat = lat.min() + (lat.max() - lat.min()) / 2.0
    map_center_lon = lon.min() + (lon.max() - lon.min()) / 2.0

    # geolocations as strings to show in marker message
    lat_dm_str = ["{:d}° {:.2f}' N".format(i[0], i[1]) for i in lat_dm]
    lon_dm_str = ["{:d}° {:.2f}' E".format(i[0], i[1]) for i in lon_dm]
    lon_with_offset_dm_str = ["{:d}° {:.2f}' E".format(i[0], i[1]) for i in lon_with_offset_dm]

    # create leaflet map
    m = Map(
        center=[map_center_lat, map_center_lon],
        zoom=2,
        dragging=True,
        scroll_wheel_zoom=True,
    )

    for j, i in enumerate(zip(lat, lon, lon_w_offset, lat_dm_str, lon_dm_str, lon_with_offset_dm_str)):
        # leaflet marker
        new_marker = Marker(location=(i[0], i[2]), draggable=False)
        message = widgets.HTML()

        # popup message to show when marker is clicked
        message.value = (
            "Lat: {:.3f} Lon: {:.3f} <br>"
            + "Lat: {} Lon: {} <br>"
            + "Lon with {:.2f} km offset from ground track <br>"
            + "Time: {} UTC"
        ).format(i[0], i[2], i[3], i[5], offset_in_km, time[j])

        new_marker.popup = message
        m.add_layer(new_marker)

    # add controls to map
    m.add_control(LayersControl())
    measure = MeasureControl(position="bottomleft", active_color="orange", primary_length_unit="kilometers")
    m.add_control(measure)
    measure.completed_color = "red"

    # map height
    m.layout.height = "600px"

    return m


def determine_axis_limits(data):
    '''Get axis limits and string represenation of time for the GUI'''
    x_lim = [(i.strftime("%Y/%m/%d-%H:%M:%S"), j) for j, i in enumerate(data["Date_time"])]

    return x_lim


def decimaldegree_to_dms(deg):
    '''Convert decimal degrees to degrees minutes and secons'''
    f, d = math.modf(deg)
    s, m = math.modf(abs(f) * 60)

    return (int(d), int(m), s * 60)


def decimaldegree_to_dm(deg):
    '''Convert decimal degrees to degrees and decimal minutes'''
    f, d = math.modf(deg)

    return (int(d), abs(f) * 60)


def get_coord_in_dms(lat, lon, lon_with_offset):
    '''Convert an array or list of coordinates from decimal degrees to degrees minutes and seconds'''
    lat_dms = [decimaldegree_to_dms(i) for i in lat]
    lon_dms = [decimaldegree_to_dms(i) for i in lon]
    lon_with_offset_dms = [decimaldegree_to_dms(i) for i in lon_with_offset]

    return lat_dms, lon_dms, lon_with_offset_dms


def get_coord_in_dm(lat, lon, lon_with_offset):
    '''Convert an array or list of coordinates from decimal degrees to degrees and decimal minutes'''
    lat_dm = [decimaldegree_to_dm(i) for i in lat]
    lon_dm = [decimaldegree_to_dm(i) for i in lon]
    lon_with_offset_dm = [decimaldegree_to_dm(i) for i in lon_with_offset]

    return lat_dm, lon_dm, lon_with_offset_dm

Search for available GRND_TRACK files in user space (must be downloaded first per FTP)

Orbit_Grnd_Track_files = [('0000', '0')]

for root, dirs, files in os.walk(grnd_track_folder):
    for name in sorted(files):
        if fnmatch.fnmatch(name, '*GRND_TRACK*.DBL'):
            Orbit_Grnd_Track_files.append((name, os.path.join(root, name)))

Define geolocation, time, altitude to create leaflet map with orbit data

As an alternative a GUI is created in the next cell where you can choose between different files which are already in your user space. You just need to uncomment the code.

# geolocation
# lat between -90 and 90
lat_min = 40
lat_max = 55
# lon between -180 and 180
lon_min = 5
lon_max = 20

# please choose a time within 7 days of the chosen GRND_TRACK_FILE_DATE in following format: YYYY/MM/DD-HH:MM:SS
time_min = "2020/05/15-00:00:00"
time_max = "2020/05/20-00:00:00"

# altitude of the measurement - geolocation differs due to the 35 degree off-nadir angle
altitude_in_km = 12

orbit_grnd_track_data = read_grnd_track(grnd_track_file_path)

xlim_min = np.argmin(
    np.abs(orbit_grnd_track_data["Date_time"] - dt.datetime.strptime(time_min, "%Y/%m/%d-%H:%M:%S"))
)
xlim_max = np.argmin(
    np.abs(orbit_grnd_track_data["Date_time"] - dt.datetime.strptime(time_max, "%Y/%m/%d-%H:%M:%S"))
)

leaflet_map = get_map(
    data=orbit_grnd_track_data,
    xlim=(xlim_min, xlim_max),
    geo_lim=((lat_min, lat_max), (lon_min, lon_max)),
    altitude_in_km=altitude_in_km,
)

Create graphical user interface

Please uncomment the following cell if you want to use a GUI for the selection of the orbit ground track data

# filelist = widgets.Dropdown(
#     options=sorted(Orbit_Grnd_Track_files),
#     # value=sorted(Orbit_Grnd_Track_files)[0],
#     description="File:",
#     disabled=False,
# )

# output = widgets.Output()

# clear_button = widgets.Button(description="Clear Loaded Files")

# plot_button = widgets.Button(description="Plot Data")

# export_button = widgets.Button(description="Export Data")

# file_list_text = widgets.HTML(
#     description="Loaded Files:",
# )

# loaded_files = {"0000": "00"}
# selected_data = {}

# selected_file = widgets.Dropdown(
#     options=loaded_files,
#     description="Selected File:",
#     disabled=False,
# )

# set_xlim_min = widgets.SelectionSlider(
#     options=[0, 1, 2, 3, 4, 5],
#     description="xlim_min:",
#     disabled=False,
#     layout={"width": "500px"},
# )

# set_xlim_max = widgets.SelectionSlider(
#     options=[0, 1, 2, 3, 4, 5],
#     description="xlim_max:",
#     disabled=False,
#     layout={"width": "500px"},
# )

# set_geolocation_lat = widgets.IntRangeSlider(
#     value=(40, 55),
#     min=-90,
#     max=90,
#     description="latitude range:",
#     disabled=False,
#     layout={"width": "500px"},
#     style={"description_width": "initial"},
# )

# set_geolocation_lon = widgets.IntRangeSlider(
#     value=(5, 20),
#     min=-180,
#     max=180,
#     description="longitude range:",
#     disabled=False,
#     layout={"width": "500px"},
#     style={"description_width": "initial"},
# )

# set_offset = widgets.BoundedIntText(
#     description="Altitude where to intersect with Aeolus",
#     value=12,
#     min=0,
#     max=30,
#     layout={"width": "500px"},
#     style={"description_width": "initial"},
# )


# def on_clear_button_clicked(b):
#     global loaded_files
#     loaded_files = {"0000": "0"}
#     file_list_text.value = ""


# def on_plot_button_clicked(b):
#     global leaflet_map
#     leaflet_map = get_map(
#         data=selected_data,
#         xlim=(set_xlim_min.value, set_xlim_max.value),
#         geo_lim=(set_geolocation_lat.value, set_geolocation_lon.value),
#         altitude_in_km=set_offset.value,
#     )
#     with output:
#         print("Map available")


# def on_csv_export_button_clicked(b):
#     export_orbits_as_csv(
#         data=selected_data,
#         xlim=(set_xlim_min.value, set_xlim_max.value),
#         geo_lim=(set_geolocation_lat.value, set_geolocation_lon.value),
#         altitude_in_km=set_offset.value,
#     )


# def handle_filelist_selection(select):
#     global loaded_files
#     data = read_grnd_track(select.new)
#     data_key = data["date"] + "_" + data["time"]
#     loaded_files[data_key] = data
#     file_list_text.value = "<br>".join(list(loaded_files.keys()))
#     selected_file.options = sorted(loaded_files)


# def handle_file_selection(select):
#     global selected_data
#     selected_data = loaded_files[select.new]
#     xlim = determine_axis_limits(selected_data)
#     set_xlim_min.options = xlim
#     set_xlim_max.options = xlim
#     set_xlim_min.value = xlim[0][1]
#     set_xlim_max.value = xlim[-1][1]


# # Connect widgets and functions
# filelist.observe(handle_filelist_selection, names="value")
# clear_button.on_click(on_clear_button_clicked)
# selected_file.observe(handle_file_selection, names="value")
# plot_button.on_click(on_plot_button_clicked)
# export_button.on_click(on_csv_export_button_clicked)

# # Layout
# leftbox = widgets.VBox(
#     [
#         filelist,
#         clear_button,
#         selected_file,
#         set_xlim_min,
#         set_xlim_max,
#         set_geolocation_lat,
#         set_geolocation_lon,
#         set_offset,
#         plot_button,
#         export_button,
#         output,
#     ]
# )
# widgets.HBox([leftbox, file_list_text])
# If Plot-button was pressed, wait if "map is available" and execute this cell to show the map.
leaflet_map