Source code for t1t2ne.scripts.t1t2ne_interactive

#! /usr/bin/env python3

import os
import numpy as np

from .base import BaseCommand
from .textcolor import textcolor
import klassez as kz
from . import t1t2ne_utils, fun_hetrelax_models, f_findfs
import random
from scipy.special import lambertw

[docs] class InteractiveCmd(BaseCommand): SHORT_HELP = "Interactive setup of the experiment" DESCRIPTION = '\n'.join([ 'With this module, the vdlist and vclist for protein dynamics experiment is computed.', 'It can run without any input, in which case it will start from a default of 1 s for T1 and 3 loops for the CPMG block, and then suggest a vdlist based on the estimated R1_ave and R2_ave of the system.', 'In the standard operation mode, the tau_c or MW are used to compute the optimal vdlist.', 'S2 is assumed to be 0.9 for the standard operation mode.', 'The IDP mode requires input of the molecular weight of the protein to estimate the correlation time of the slow motion.', 'Optionally, the length of the correlation window can be provided as an estimate for the intermediate correlation time.', 'If not provided, it assumes a correlation time of 1.6 ns for the intermediate motion.' 'Two S2 values are expected in the IDP mode. If not provided, they are assumed to be 0.15 for the slow motion and 0.35 for the intermediate motion.' ,])
[docs] @staticmethod def add_arguments(parser): parser.add_argument('--basedir', type=str, help='The base directory for the experiment. Default is the current directory.') parser.add_argument('--xred', nargs='*', help='The xred values to use for the calculation of the lists.') parser.add_argument('--S2', nargs='*', help='The Lipari-Szabo order parameter S2 to use for the calculation of tau_c. In IDP mode, two values should be provide, else only one') parser.add_argument('--tau', nargs='*', help='The correlation time tau to use for the calculation. In IDP mode, two values should be provided, else only one.') parser.add_argument('--idp', action='store_true', help='Whether to use the IDP model to extract the order parameter S2 instead of tau_c.') parser.add_argument('--MW', type=float, help='The molecular weight of the protein in kDa, to be used for estimating the tau_slow in the IDP model. If not specified, will raise an error if --idp is used.') parser.add_argument('--corr_window_idp', type=int, default=20, help='To estimate the correlation time of the intermediate motion in the IDP model, a correlation time of a peptide of 20 residues is used by default. This parameter allows to change this value if needed.') parser.add_argument('--T', type=float, default=298.15, help='The temperature in Kelvin, to be used for estimating the taus from MW. Default is 298 K.') parser.add_argument('--nT', nargs='*', help='The number of increments in the suggested vdlist. For this module only one value must be provided. Default is 8.') parser.add_argument('--r', type=float, default=1.02, help='The length of the 1H-15N bond in Angstroms. Default is 1.02 A.') parser.add_argument('--Deltasigma', type=float, default=-160, help='The chemical shift anisotropy of the 15N nucleus in ppm. Default is -160 ppm.') parser.add_argument('--theta', type=float, default=17, help='The angle between the 1H-15N bond and the principal axis of the CSA tensor in degrees. Default is 17 degrees.') parser.add_argument('--B0', nargs='*', help='The magnetic field strength in Tesla. If not provided, the script will try to load it from the config file. If it is not found in the config file, an error will be raised.') parser.add_argument('--Larmor', nargs=1, type=float, help='The Larmor frequency of the nucleus in MHz. If not provided, the script will try to load it from the config file. If it is not found in the config file, an error will be raised.') parser.add_argument('--logscale', action='store_true', help='Whether to use a logarithmic scale for the vdlist and vclist. Default is False, which means a linear scale will be used.') parser.add_argument('--nucs', nargs='*', default=['1H', '15N'], help='The nuclei to use for the calculation of the relaxation rates. Default is 1H and 15N.') parser.add_argument('--large', action='store_true', help='Whether to create the lists for the "large" sequence, which is optimized for short T2 times. If True, the d21 value is set to 450 us and only 8 cycles per CPMG block are used instead of 16. Default is False.') parser.add_argument('--small', action='store_true', help='Whether to create the lists for the ".idp" sequence, which is optimized for long T2 times. If True, the d21 value is set to 600 us. Default is False.') parser.add_argument('--randomize', action='store_true', help='Whether to randomize the order of the values in the lists. Default is False.')
[docs] @staticmethod def run(args): CO = t1t2ne_utils.Conf_Optns(args, module='interactive') interactive_setup(CO) t1t2ne_utils.the_end(CO) exit()
[docs] def interactive_setup(CO): """ Interactive setup of the experiment. This script can only run on a spectrometer. Parameters ---------- CO: Conf_Optns object The configuration options object containing the parameters for the experiment. Returns ------- None The function prints the suggested vdlist and vclist. """ CO.add_ref('ferrage') CO.add_ref('Fiorucci') CO.add_ref('klassez') CO.get_B0() expectedratio = np.real(np.exp(-(1+lambertw(1/np.e)))) # maximum reduction achievable with the optimal CPMG block, which is 1+lambertw(1/e) times the T2 time of the system if hasattr(CO, 'tau') and CO.tau is not None: CO.add_ref('fushman') R1, R2, nOe = fun_hetrelax_models.R1R2nOe(CO.B_0, r=CO.r, nuc1=CO.nucs[0], nuc2=CO.nucs[1], Deltasigma=CO.Deltasigma, func=fun_hetrelax_models.LS_iso, f_args=(CO.S2, CO.tau)) print(textcolor('Based on the provided parameters, the estimated R1 is {:.2f} s^-1 and R2 is {:.2f} s^-1.'.format(R1, R2), 'green')) T1_30 = -np.log(expectedratio)/R1 T2_30 = -np.log(expectedratio)/R2 else: T1_30 = 0.5 T2_30 = 0.065 if hasattr(CO, 'xred'): if len(CO.xred) == 1: CO.T1red = CO.T2red = CO.xred[0]*0.01 elif len(CO.xred) == 2: CO.T1red = CO.xred[0]*0.01 CO.T2red = CO.xred[1]*0.01 else: print(textcolor('Warning: the xred parameter accepts only one or two values, defaulting to 5% for T1 and 10% for T2', 'yellow', bold=True)) else: CO.T1red = float(input('Enter percent residual signal for the longest delay of the T1 experiment (default 5%): ').strip() or "5")*0.01 CO.T2red = float(input('Enter percent residual signal for the longest CPMG block of the T2 experiment (default 10%): ').strip() or "10")*0.01 logscale = CO.options['logscale'] nT1 = CO.nT[0] if len(CO.nT) == 1: nT2 = nT1 else: nT2 = CO.nT[1] print('\nFrom now on you need to run this script on the spectrometer, as it will need to access the acquisition parameters to suggest the lists for the experiment.') info = f_findfs.find_topspin() if not info['found'] or not info['spectrometer']: textcolor('TopSpin spectrometer installation not found. Please make sure you are running this script on the spectrometer and that TopSpin is properly installed.', 'red', bold=True) t1t2ne_utils.the_end(CO) if not hasattr(CO, 'basedir') or CO.basedir is None: CO.basedir = input('Please enter the base directory for the experiment (default is current directory): ') or '.' if not os.path.exists(CO.basedir): raise RuntimeError('The specified base directory does not exist. Please make sure to enter a valid directory.') print('\nStarting the interactive setup of the T1 experiment...') d29 = -np.log(CO.T1red)/R1 print(f'Acquire the reference spectrum first with d31 = 20u and d29 = {d29:.3f} s.') refno = input('Please enter the reference spectrum number (default is 10): ') or '10' path_ref = os.path.join(CO.basedir, refno) if not os.path.exists(path_ref): raise RuntimeError('The specified reference spectrum number does not exist in the base directory. Please make sure to acquire the reference spectrum and enter the correct number.') try: S_ref = kz.Spectrum_1D(path_ref) except Exception as e: raise RuntimeError(f'Error occurred while loading the reference spectrum: {e}') version = t1t2ne_utils.fs_version(S_ref) if version == 'topspin3': S_ref.acqus['GRPDLY'] += 1 S_ref.procs['wf']['mode'] = 'em' S_ref.procs['wf']['lb'] = 1/S_ref.acqus['AQ'] print(f'Line broadening applied: {S_ref.procs["wf"]["lb"]:.2g} Hz') # Zerofill to twice the size S_ref.procs['zf'] = 2 * S_ref.fid.shape[-1] # Apply and do FT S_ref.process() # Remove digital filter S_ref.pknl() S_ref.adjph() S_ref.integrate(filename='refspec') limits = kz.misc.key_to_limits(list(S_ref.integrals.keys())) ta = S_ref.ngdic['acqus']['D'][31] whilecontrol = True iteration = 0 while whilecontrol: print(textcolor(f'To achieve a reduction of the signal intensity of about {expectedratio*100:.0f}%, try setting d31 to ' + t1t2ne_utils.f4(T1_30), 'blue')) expno = input(f'Please enter the experiment number for the T1 experiment once it is done (default is {refno+iteration+1}): ') or f'{refno+iteration+1}' path_t1 = os.path.join(CO.basedir, expno) if not os.path.exists(path_t1): raise RuntimeError('The specified T1 experiment number does not exist in the base directory. Please make sure to acquire the T1 experiment and enter the correct number.') try: S_t1 = kz.Spectrum_1D(path_t1) except Exception as e: raise RuntimeError(f'Error occurred while loading the T1 spectrum: {e}') if version == 'topspin3': S_t1.acqus['GRPDLY'] += 1 S_t1.procs['wf']['mode'] = 'em' S_t1.procs['wf']['lb'] = 1/S_t1.acqus['AQ'] S_t1.procs['zf'] = 2 * S_t1.fid.shape[-1] S_t1.process() S_t1.pknl() S_t1.adjph() S_t1.integrate(filename='t1spec', lims=limits) tb = S_t1.ngdic['acqus']['D'][31] ratio = 0 for key in S_t1.integrals.keys(): ratio += S_t1.integrals[key] / S_ref.integrals[key] * (1/len(S_t1.integrals.keys())) R1_ave = -np.log(ratio) / (tb-ta) print(f'Iteration {iteration}: Estimated R1 from the reference ({ta*1e6:.2f} u) and T1 spectra ({tb:.3f} s) with a ratio of {ratio:.2f}: {R1_ave:.2f} s^-1') T1_30_est = -np.log(expectedratio)/R1_ave if abs(T1_30_est - T1_30) / T1_30 > 0.1: T1_30 = T1_30_est else: whilecontrol = False iteration += 1 print(textcolor(f'Convergence achieved for T1 estimation. Estimated $R1_{{ave}}$ is {R1_ave:.2f} s.', 'green')) print(textcolor(f'Set the recovery delay for the hetnOe experiment in the range {4/R1_ave:.3f} - {6/R1_ave:.3f}.\n', 'blue', bold=False)) T1max = -np.log(CO.T1red)/R1_ave if logscale: vdlist_T1 = [2e-5 - 1/R1_ave * np.log(1-(1-CO.T1red)*i/(nT1-1)) for i in range(nT1)] #logarithmically spaced list from 20u to to T1max else: vdlist_T1 = np.linspace(2e-5, T1max, num=nT1) #linearly spaced list from 20u to to T1max print(textcolor('\nvdlist for T1 experiment:', 'blue')) print('-'*25) if CO.options['randomize']: random.shuffle(vdlist_T1) t1t2ne_utils.out_vdlist(vdlist_T1) print('\nStarting the interactive setup of the T2 experiment...') T2max = -np.log(CO.T2red)/R2 if CO.options['idp'] and not CO.options['small']: CO.options['small'] = True if CO.options['small']: print(textcolor('Using ".idp" sequence, which is optimized for long T2 times. d21 = 750u', 'blue')) d21 = float(input('Enter the d21 value in microseconds (default 750): ').strip() or "750") else: d21 = float(input('Enter the d21 value in microseconds (default 450): ').strip() or "450") p30 = float(input('Enter the p30 value in microseconds (default 160): ').strip() or "160") T2limit = t1t2ne_utils.T2max_duty_cycle(p30=p30, d21=d21) print(textcolor(f'With the chosen d21 and p30 values, the maximum CPMG block allowed by the duty cycle is {T2limit:.2f} s.', 'blue')) if T2max > T2limit: print(textcolor(f'Alert: the longest CPMG block for a residual signal of {T2red*100:.0f}% is {T2max:.2f} s, which is likely too long for any equipment.', 'red', bold=True)) print(textcolor(f'Setting the maximum CPMG block to {T2limit:.3f} s', 'yellow')) T2max = T2limit T2red = 1 - np.exp(-R2*T2max) print(textcolor(f'With this maximum CPMG block, the expected residual signal is {T2red*100:.0f}%.', 'yellow', bold=True)) if CO.options['large']: print(textcolor('You chose the "large" sequence, which is optimized for short T2 times, this option will be disabled.', 'yellow', bold=True)) CO.options['large'] = False d31 = (p30*16+d21*32) if CO.options['large']: d31 = d31/2 T2red_max = 1 - np.exp(-R2*T2limit) T2_30 = min(T2_30, T2limit) l29 = int(T2_30/(d31*1e-6)) l31_hl = int(T2limit/(d31*1e-6)) print(f'Acquire the reference spectrum first with l31 = 0 and l29={l29:d}.') refno = input('Please enter the reference spectrum number (default is 20): ') or '20' path_ref = os.path.join(CO.basedir, refno) if not os.path.exists(path_ref): raise RuntimeError('The specified reference spectrum number does not exist in the base directory. Please make sure to acquire the reference spectrum and enter the correct number.') try: S_ref = kz.Spectrum_1D(path_ref) except Exception as e: raise RuntimeError(f'Error occurred while loading the reference spectrum: {e}') version = t1t2ne_utils.fs_version(S_ref) if version == 'topspin3': S_ref.acqus['GRPDLY'] += 1 S_ref.procs['wf']['mode'] = 'em' S_ref.procs['wf']['lb'] = 1/S_ref.acqus['AQ'] print(f'Line broadening applied: {S_ref.procs["wf"]["lb"]:.2g} Hz') # Zerofill to twice the size S_ref.procs['zf'] = 2 * S_ref.fid.shape[-1] # Apply and do FT S_ref.process() # Remove digital filter S_ref.pknl() S_ref.adjph() S_ref.integrate(filename='refspec') limits = kz.misc.key_to_limits(list(S_ref.integrals.keys())) ta = S_ref.ngdic['acqus']['L'][31]*d31*1e-6 whilecontrol = True iteration = 0 l1_30 = int(T2_30/(d31*1e-6)) if l1_30 <= 0: l1_30 = 1 if l1_30 >= l31_hl: l1_30 = l31_hl R2ave = R2 while whilecontrol: T2red_30 = max(1 - np.exp(-R2ave*l1_30*d31*1e-6), T2red_max) print(textcolor(f'To achieve a reduction of the signal intensity of about {T2red_30*100:.0f}%, try setting l31 to {l1_30:.0f}', 'blue')) expno = input(f'Please enter the experiment number for the T2 experiment once it is done (default is {refno+iteration+1}): ') or f'{refno+iteration+1}' path_t2 = os.path.join(CO.basedir, expno) if not os.path.exists(path_t2): raise RuntimeError('The specified T2 experiment number does not exist in the base directory. Please make sure to acquire the T2 experiment and enter the correct number.') try: S_t2 = kz.Spectrum_1D(path_t2) except Exception as e: raise RuntimeError(f'Error occurred while loading the T2 spectrum: {e}') if version == 'topspin3': S_t2.acqus['GRPDLY'] += 1 S_t2.procs['wf']['mode'] = 'em' S_t2.procs['wf']['lb'] = 1/S_t2.acqus['AQ'] S_t2.procs['zf'] = 2 * S_t2.fid.shape[-1] S_t2.process() S_t2.pknl() S_t2.adjph() S_t2.integrate(filename='t2spec', lims=limits) tb = S_t2.ngdic['acqus']['L'][31]*d31*1e-6 ratio = 0 for key in S_t2.integrals.keys(): ratio += S_t2.integrals[key] / S_ref.integrals[key] * (1/len(S_t2.integrals.keys())) R2_ave = -np.log(ratio) / (tb-ta) print(f'Iteration {iteration}: Estimated R2 from the reference ({ta*1e6:.2f} us) and T2 ({tb*1e3:.2f} ms), with ratio of {ratio:.3f}: {R2_ave:.2f} s^-1') l1_30_est = int((-np.log(expectedratio)/R2_ave)/(d31*1e-6)) if l1_30_est == 0: l1_30_est = 1 if l1_30_est >= l31_hl: l1_30_est = l31_hl if np.abs(l1_30_est - l1_30) > 1: l1_30 = l1_30_est else: whilecontrol = False iteration += 1 print(textcolor(f'Convergence achieved for T2 estimation. Estimated $R2_{{ave}}$ is {R2_ave:.2f} s.', 'green')) T2max = -np.log(T2red)/R2_ave nmax = int(T2max/(d31*1e-6)) if T2max > T2limit: print(textcolor(f'Alert: the longest CPMG block for a residual signal of {T2red*100:.0f}% is {T2max:.2f} s, which is likely too long for any equipment.', 'red', bold=True)) print(textcolor(f'Setting the maximum CPMG block to {T2limit:.3f} s', 'yellow')) T2max = T2limit T2red = 1 - np.exp(-R2*T2max) print(textcolor(f'With this maximum CPMG block, the expected residual signal is {T2red*100:.0f}%.', 'yellow', bold=True)) if CO.options['large']: print(textcolor('You chose the "large" sequence, which is optimized for short T2 times, this option will be disabled.', 'yellow', bold=True)) CO.options['large'] = False #print(f'd21 = {d21} us, p30 = {p30} us, cpmgblock = {d31} us') print('\n') print(textcolor(f'The longest CPMG block for T2 for a residual signal of {T2red*100:.0f}% should be {T2max:.2f} s, with {nmax} loops.' , 'default', bold=True)) #grassetto print(textcolor('Check if this is too long for your equipment before running the experiment', 'red')) #rosso if nT2 > nmax: print(textcolor(f'Warning: the number of increments you chose for the CPMG experiment is {nT2}, which is more than the recommended number of loops {nmax} for a longest CPMG block of {T2max:.2f} s. The list will contain duplicates.', 'yellow')) #giallo if logscale: vclist_T2 = [-1/R2 * np.log(1-(1-T2red)*i/(nT2-1))/(d31*1e-6) for i in range(nT2)] vclist_T2 = np.array(vclist_T2, dtype=np.uint64) #convert to np.uint64 else: vclist_T2 = np.linspace(0,nmax, num=int(nT2), dtype=np.uint64) #linearly spaced list from 0 to nmax vclist_T2 = list(vclist_T2) if CO.options['randomize']: random.shuffle(vclist_T2) print(textcolor('\nvclist for T2 experiment:', 'blue')) print('-'*25) for value in vclist_T2: print(f'{value:d}') print('\n')