Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions bin/RecenterAnalysisEnsemble.csh
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
#!/bin/csh -f

# (C) Copyright 2025 UCAR
#
# This software is licensed under the terms of the Apache Licence Version 2.0
# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.

# Recenters an analysis ensemble on a given center state.
# Currently only works for an EnKF analysis ensemble

source config/environmentJEDI.csh # sets up the environment etc.
source config/auto/build.csh # RecenterBuildDir, RecenterExe
source config/auto/enkf.csh # ensPbMemNDigits, ensPbNMembers, ensPbMemPrefix, workDirVarDA
source config/auto/experiment.csh # ConfigDir
source config/auto/model.csh # outerNamelistFile, outerStreamsFile
source config/auto/naming.csh # analysisSubDir, ANFilePrefix, DAWorkDir

# Obtain date information
# set CYLC_TASK_CYCLE_POINT = "20180415T0600Z" # will be set by cylc eventually
echo "Cylc task cycle point: $CYLC_TASK_CYCLE_POINT" # debugging
set yyyy = `echo ${CYLC_TASK_CYCLE_POINT} | cut -c 1-4`
set mm = `echo ${CYLC_TASK_CYCLE_POINT} | cut -c 5-6`
set dd = `echo ${CYLC_TASK_CYCLE_POINT} | cut -c 7-8`
set hh = `echo ${CYLC_TASK_CYCLE_POINT} | cut -c 10-11`
set cycleDate = "${yyyy}${mm}${dd}${hh}"
set cycleDateISO8601 = "${yyyy}-${mm}-${dd}T${hh}:00:00Z"
set cycleDateMpas = "${yyyy}-${mm}-${dd}_${hh}.00.00"
echo "Current cycle date: $cycleDate" # debugging

# Change to work directory and copy configuration yaml file
set cyclingDADirEnKF = "${DAWorkDir}/${cycleDate}"
set yamlFileRecenter = "recenter.yaml"
cd "${cyclingDADirEnKF}/run"
echo "Working in directory: `pwd`" # debugging
cp -v "${ConfigDir}/jedi/applications/${yamlFileRecenter}" "$yamlFileRecenter"
if ( $status != 0 ) then
echo "ERROR: recenter yaml not available --> $yamlFileRecenter" > ./FAIL
exit 1
endif

# Populate placeholders in yaml file
# 1) date information
sed -i "s@{{cycleDateISO8601}}@$cycleDateISO8601@" $yamlFileRecenter

# 2) center geometry information
# todo: update so that varDA can have different resolution
set cyclingDADirVar = "${workDirVarDA}/CyclingDA/${cycleDate}"
set namelistFileCenter = "${cyclingDADirVar}/${outerNamelistFile}"
set streamsFileCenter = "${cyclingDADirVar}/${outerStreamsFile}"
sed -i "s@{{namelistFileCenter}}@$namelistFileCenter@" $yamlFileRecenter
sed -i "s@{{streamsFileCenter}}@$streamsFileCenter@" $yamlFileRecenter

# 3) center analysis information.
# The current assumption is that analysisSubDir and ANFilePrefix in the
# EnKF and var DA run are identical. This is not necessarily true but reasonable
# set mpasFileSuffix = '$Y-$M-$D_$h.$m.$s.nc'
set mpasFileSuffix = "${cycleDateMpas}.nc"
set analysisFileCenter = "${cyclingDADirVar}/${analysisSubDir}/${ANFilePrefix}.${mpasFileSuffix}"
sed -i "s@{{analysisFileCenter}}@$analysisFileCenter@" $yamlFileRecenter

# 4) EnKF ensemble geometry information
# set cyclingDADirEnKF = "${DAWorkDir}/${cycleDate}"
set namelistFileEnsemble = "${cyclingDADirEnKF}/${outerNamelistFile}"
set streamsFileEnsemble = "${cyclingDADirEnKF}/${outerStreamsFile}"
sed -i "s@{{namelistFileEnsemble}}@$namelistFileEnsemble@" $yamlFileRecenter
sed -i "s@{{streamsFileEnsemble}}@$streamsFileEnsemble@" $yamlFileRecenter

# 5) EnKF analysis ensemble information
# Add an _orig suffix to label the original analysis ensemble member
set analysisFileEnsembleBase = "${ANFilePrefix}_orig.${mpasFileSuffix}"
# set analysisFileEnsemble = "${cyclingDADirEnKF}/${analysisSubDir}/${ensPbMemPrefix}%iMember%/${ANFilePrefix}_orig.${mpasFileSuffix}"
set analysisFileEnsemble = "${cyclingDADirEnKF}/${analysisSubDir}/${ensPbMemPrefix}%iMember%/${analysisFileEnsembleBase}"
sed -i "s@{{analysisFileEnsemble}}@$analysisFileEnsemble@" $yamlFileRecenter
sed -i "s@{{paddingEnsembleMembers}}@$ensPbMemNDigits@" $yamlFileRecenter
sed -i "s@{{numberEnsembleMembers}}@$ensPbNMembers@" $yamlFileRecenter

# 6) Recentered output
# The recentered analysis files obtain the standard analysis file name, so that a subsequent
# forecast can be started from them
set analysisFileRecenterBase = "${ANFilePrefix}.${mpasFileSuffix}"
# set analysisFileRecenter = "${cyclingDADirEnKF}/${analysisSubDir}/${ensPbMemPrefix}%iMember%/${ANFilePrefix}.${mpasFileSuffix}"
set analysisFileRecenter = "${cyclingDADirEnKF}/${analysisSubDir}/${ensPbMemPrefix}%{member}%/${analysisFileRecenterBase}"
sed -i "s@{{analysisFileRecenter}}@$analysisFileRecenter@" $yamlFileRecenter

# Copy original analysis files (recall that the recentered files have the original name)
@ i = 1
while ( $i <= $ensPbNMembers )
set memberDirBase = `printf "${ensPbMemPrefix}%0${ensPbMemNDigits}d" $i`
set memberDir = "${cyclingDADirEnKF}/${analysisSubDir}/${memberDirBase}"
cp -v "${memberDir}/${analysisFileRecenterBase}" "${memberDir}/${analysisFileEnsembleBase}"
@ i++
end

# Link and run the recentering executable. This overwrites the analysis files
set jediOutputFile = "recenter.log"
ln -sfv "${RecenterBuildDir}/${RecenterEXE}" ./
mpiexec "./${RecenterEXE}" "$yamlFileRecenter" "./${jediOutputFile}" >& recenter.log.all

# Make sure the application terminated successfully
grep 'Run: Finishing oops.* with status = 0' "$jediOutputFile"
if ( $status != 0 ) then
echo "ERROR: recenter application failed" > ./FAIL
exit 1
# to do: is a further cleanup along these lines necessary?
# else
# rm ${appName}.log.0*
endif
118 changes: 118 additions & 0 deletions config/jedi/applications/recenter.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
# Variables to recenter: _state variables
# ---------------------------------------
# All variables from the da_stream I/O stream are recentered, except for the following:
# 1) static variables (terrain, hydrostatic balances):
# - ter
# - xland
# - dzs
# - zs
# - pressure_base
# - rho_base
# - theta_base
#
# 2) real-valued variables that are identical across ensemble members:
# - sfc_albbck
# - sfc_emibck
#
# 3) scalar integer variables identical across ensemble members
# (a real-valued mean would not be meaningful)
# - isice_lu
# - iswater_lu
#
# 4) variables containing information about past states
# - xicem
# - h_oml_initial
#
# 5) redundant variables
# - air_pressure (note: we do recenter pressure_p)

_state variables: &daStateVars
- water_vapor_mixing_ratio_wrt_dry_air
- cloud_liquid_water
- rain_water
- cloud_liquid_ice
- snow_water
- graupel
- cloud_ice_number_concentration
- rain_number_concentration
- cloud_droplet_number_concentration
- u
- w
- dry_air_density
- pressure_p
- air_potential_temperature
- relative_humidity
- eastward_wind
- northward_wind
- air_pressure_at_surface
- cldfrac
- re_cloud
- re_ice
- re_snow
- refl10cm
- refl10cm_max
- rainc
- rainnc
- lai
- sfc_albedo
- mavail
- sfc_emiss
- thc
- ust
- z0
- znt
- skin_temperature_at_surface
- snow
- snowc
- snowh
- sst
- tmn
- vegetation_area_fraction
- seaice
- seaice_fraction
- eastward_wind_at_10m
- northward_wind_at_10m
- water_vapor_mixing_ratio_wrt_moist_air_at_2m
- air_temperature_at_2m
- precipw
- sh2o
- smois
- tslb

_current date: &currentDate {{cycleDateISO8601}}

recenter variables: *daStateVars

center geometry:
nml_file: {{namelistFileCenter}}
streams_file: {{streamsFileCenter}}
deallocate non-da fields: false

center:
filename: {{analysisFileCenter}}
date: *currentDate
state variables: *daStateVars
stream name: da_state
transform model to analysis: false

ensemble geometry:
nml_file: {{namelistFileEnsemble}}
streams_file: {{streamsFileEnsemble}}
deallocate non-da fields: false

ensemble:
members from template:
template:
date: *currentDate
state variables: *daStateVars
filename: {{analysisFileEnsemble}}
stream name: da_state
transform model to analysis: false
pattern: %iMember%
start: 1
zero padding: {{paddingEnsembleMembers}}
nmembers: {{numberEnsembleMembers}}

recentered output:
filename: {{analysisFileRecenter}}
stream name: da_state
1 change: 1 addition & 0 deletions initialize/applications/DA.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ def export(self, previousForecast:str, ef:ExtendedForecast):
for st in self.__subtasks:
self._tasks += st._tasks
self._dependencies += st._dependencies
self._xtriggers += st._xtriggers

# depends on previous Forecast
self.tf.addDependencies([previousForecast])
Expand Down
110 changes: 110 additions & 0 deletions initialize/applications/EnKF.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#!/usr/bin/env python3

from collections import OrderedDict
from getpass import getuser
import os.path

from initialize.applications.Members import Members

Expand Down Expand Up @@ -86,6 +88,7 @@ class EnKF(Component):
'biasCorrection': [False, bool],

# directories that stores varBC coefficients that are updated with variational DA
# if coupledToVarDA == True the staticVarBcDir is automatically set to the coupled var DA directory
'staticVarBcDir': ['/glade/campaign/mmm/parc/ivette/pandac/year7Exp/ivette_3dhybrid-allsky-60-60-iter_O30kmI60km_ensB-SE80+RTPP70_VarBC_v3.0.2_newBenchmark_allsky-amsua/CyclingDA', str],

## tropprsMethod
Expand Down Expand Up @@ -118,6 +121,34 @@ class EnKF(Component):
## IR/VIS land surface coefficients classification
# OPTIONS: USGS, IGBP, NPOESS
'IRVISlandCoeff': ['IGBP', str],

## Coupling to an external variational DA run
# When coupledToVarDA is true, this EnKF run can be recentered (recenterAnalyses == True) on the variational DA run and/or
# it can use the variational bias correction coefficients (biasCorrection == True) of the variational DA run
'coupledToVarDA': [False, bool],

## Recentering of analysis ensemble
# When recenterAnalyses is true, the analysis ensemble of this EnKF is recentered on the analysis of a
# concurrently running var DA run. Requires coupledToVarDA == True.
# The recentering application is not implemented yet.
'recenterAnalyses': [False, bool],
}

optionalVariables = {
## Coupling to an external variational DA run
# Must be specified if coupledToVarDA == True, not used otherwise
# Specify the full name of the workflow, constructed as:
# Experiment['prefix'] + Experiment['name'] + Experiment['suffix'] + Experiment['suite identifier']
# The default values are:
# Experiment['prefix'] = "${USER}_"
# Experiment['name']: see Cycle.py
# Experiment['suffix'] = ""
# Experiment['suite identifier'] = ""
# It is best practice to specify the experiment prefix, name, and suffix in the coupled yaml files
# to make sure the coupled workflow can be identified correctly.
# Note: the workflow automatically prepends 'MPAS-Workflow' to workflowNameVarDA where needed
# to make the identifier consistent with submit.csh
'workflowNameVarDA': str,
}

def __init__(self,
Expand Down Expand Up @@ -183,6 +214,31 @@ def __init__(self,
# TODO: this needs to be non-zero for EnKF workflows that use IAU, get value from forecast
self._set('ensPbOffsetHR', 0)

# coupling to variational DA
if self['coupledToVarDA']:
# A coupled EnKF run should use some information from the variational DA.
# Stop the run if no information is used in the current configuration to make sure
# user can adjust settings.
if not self['biasCorrection'] and not self['recenterAnalyses']:
raise ValueError(("Coupled EnKF does not use any information from var DA."
"biasCorrectionEnKF and/or recenterAnalyses should be set to true."))
# A coupled EnKF run requires the name of the variational DA workflow to
# set the external dependency
if self['workflowNameVarDA'] is None:
raise ValueError("workflowNameVarDA has to be specified for a coupled EnKF")
# Export the var DA run information to the csh file
self._set('workflowNameVarDA', self['workflowNameVarDA'])
workDirVarDA = os.path.join('/glade', 'derecho', 'scratch', getuser(),
'pandac', self['workflowNameVarDA'])
self._set('workDirVarDA', workDirVarDA)
# Overwrite the static var BC directory to make sure we are reading the satbias files from
# the coupled var DA run
self._set('staticVarBcDir', f'{os.path.join(workDirVarDA, "CyclingDA")}')
else:
# Recentering the analysis ensemble is not defined if the EnKF run is not coupled.
if self['recenterAnalyses']:
raise NotImplementedError("Recentering of the analyses ensemble is undefined if the EnKF run is not coupled.")

self._cshVars = list(self._vtable.keys())

########################
Expand Down Expand Up @@ -262,6 +318,60 @@ def __init__(self,
inherit = '''+self.tf.execute+''', BATCH
script = $origin/bin/EnKF.csh Solver
'''+solvertask.job()+solvertask.directives()]

if self['coupledToVarDA']:
# The coupled EnKF run uses the analysis file and/or the variational bias correction files
# from the variational DA run. This introduces a dependency between the EnKF and the
# variational DA at the current cycle point. The following external trigger defines this dependency.
# Note: submit.csh prepends 'MPAS-Workflow/' to the workflow name to generate the workflow ID
workflowIdVarDA = os.path.join('MPAS-Workflow', self['workflowNameVarDA'])
callIntervalInS = 30 # call interval for external trigger (default is 10)
self._xtriggers +=[('\n'
' var_da = workflow_state('
f'workflow="{workflowIdVarDA}", '
'task="DAFinished__", point="%(point)s")'
f':PT{callIntervalInS}S')]

# Add the dependency on the var DA run to the graph
self._dependencies += [('\n'
' @var_da => ' + self.tf.pre)]
if self['recenterAnalyses']:
# Add the dependency: the EnKFDiagOMA task is optional. If the task is enabled, the recentering
# task should run after it. If it is not enabled, the recentering task should run after the solver task.
if self['diagEnKFOMA'] and self['retainObsFeedback']:
self._dependencies += [('\n'
' EnKFDiagOMA => RecenterEnKF')]
else:
self._dependencies += [('\n'
' EnKFSolver => RecenterEnKF')]
# Add the task
keyListRecenter = {
# Notes:
# - the recenter application is currently small enough to run on develop
# - the develop queue does not support a job_priority flag, so removed this for now
'retry': {'t': str},
'baseSeconds': {'t': int},
'secondsPerMember': {'t': int},
'nodes': {'t': int},
'PEPerNode': {'t': int},
'memory': {'t': str},
'queue': {'def': hpc['SharedQueue']},
'account': {'def': hpc['CriticalAccount']},
'email': {'def': True, 't': bool},
}
resourceRecenter = meshes['Outer'].name + '.' + solver + '.recenter'
recenterJob = Resource(self._conf, keyListRecenter, ('job', resourceRecenter))
recenterJob._set('seconds', recenterJob['baseSeconds'] + recenterJob['secondsPerMember'] * NN)
recenterTask = TaskLookup[hpc.system](recenterJob)
self._tasks += [('\n'
' [[RecenterEnKF]]'
'\n'
f' inherit = {self.tf.execute}, BATCH'
'\n'
' script = $origin/bin/RecenterAnalysisEnsemble.csh'
'\n'
f'{recenterTask.job() + recenterTask.directives()}'
'\n')]

if self['diagEnKFOMA'] and self['retainObsFeedback']:
self._tasks += ['''
Expand Down
Loading