#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Sat Jul 14 12:53:59 2018 @author: Max Lloyd, UC Berkeley, mklloyd@berkeley.edu Script to process the csv export from a Qtegra Labbook on the MAT 253 Ultra at UC Berkeley. Script parses raw data, organizes sub-integrations in sa-std cycles, applies background correction, applies a data filter for peak drift, and calculates ratios and delta values. """ import numpy as np import matplotlib as mpl import matplotlib.pyplot as plt import pandas as pd import os from scipy.special import erfinv mpl.rcParams.update({'mathtext.default': 'regular'}) plt.ion() plt.close('all') def get_measurement_params(): while True: try: integration_num = int(input( 'number of integrations in each measurement? ')) break except ValueError: print('Not a valid number, try again...') while True: try: cycle_num = int(input('number of cycles in each acquire? ')) break except ValueError: print('Not a valid number, try again...') while True: try: integration_time = float(input('Individual integration time? ')) break except ValueError: print('Not a valid float, try again... ') return(cycle_num, integration_time, integration_num) def import_individual_Qtegra_file(): while True: d_data_file = input('drag Qtegra csv export file... ').replace( '\\ ', ' ').strip("'").strip('"').strip() d_data_file = os.path.abspath(d_data_file) if os.path.exists(d_data_file) and d_data_file.endswith('.csv'): file_name = os.path.basename(d_data_file).split('.csv')[0] break else: print('Not a .csv file ') dr, drm, file_name = process_Qtegra_csv_file(d_data_file) return(dr, drm, file_name) def process_Qtegra_csv_file(d_data_file, prompt_for_params=True, integration_time=16.7, integration_num=10, cycle_num=10, sigma_filter=2, input_tail_D2_background=False): d_data_file = os.path.abspath(d_data_file) if os.path.exists(d_data_file) and d_data_file.endswith('.csv'): file_name = os.path.basename(d_data_file).split('.csv')[0] else: print('Not a .csv file ') raise(TypeError) d = pd.read_csv(d_data_file, sep=None, engine='python') print('Valid file, now calculating...') dt = d.transpose() dt['f'] = dt[3].astype(np.float, errors='ignore') i = dt['f'].str.extract( r'(\-*[0-9]+\.[0-9]+)', expand=False).dropna().index sigs = dt.loc[i, 'f'].astype(np.float) meta = pd.DataFrame(dt.loc[i, 1].str.split(':').values.tolist(), index=i, columns=['block', 'meas', 'peak']) measure_line = dt.loc[i, 0].astype(np.int) d = pd.concat([measure_line, meta, sigs], axis=1) d['block'] = d['block'].astype(np.int32) # measurements with backgrounds will contain multiple blocks # so, assume largest block is a measurement block, # and assume all others are backgrounds measure_block = d.groupby('block').count().idxmax()['peak'] db = d.loc[d['block'] != measure_block, :] d = d.loc[d['block'] == measure_block, :] dr = pd.DataFrame(data=d.loc[d['peak'] == '12CH4', :]) dr.columns = ['measure_line', 'block', 'meas', 'peak', 'i16'] dr['measure_line'] = dr['measure_line'] - dr.iloc[0, :]['measure_line'] dr['block'] = dr['block'].astype(np.int) dr['meas_line'] = dr['measure_line'].copy() dr.set_index('meas_line', inplace=True) if prompt_for_params: cycle_num, integration_time, integration_num = get_measurement_params() # now, do the same for the background db dr = sort_by_cycles(dr, integration_num, cycle_num) if db.size > 0: dbr = pd.DataFrame(data=db.loc[db['peak'].isin( ['b_i16', 'b2_i16', 'frag13_i16']), :]) dbr.columns = ['measure_line', 'block', 'meas', 'peak', 'i16'] dbr['measure_line'] = dbr[ 'measure_line'] - dbr.iloc[0, :]['measure_line'] dbr['block'] = dbr['block'].astype(np.int) dbr['meas_line'] = dbr['measure_line'].copy() dbr.set_index('meas_line', inplace=True) background_masses = ['i17', 'i18'] # prep columns for big background data merge db.rename(columns={0: 'meas_line'}, inplace=True) db['meas_line'] = db['meas_line'] - db.iloc[0, :]['meas_line'] for background_mass in background_masses: peaks_to_merge = [] for p in db['peak'].unique(): if background_mass in p: peaks_to_merge.append(p) dbr = dbr.join( db.loc[db.peak.isin(peaks_to_merge), ['meas_line', 'f']].set_index('meas_line').rename( columns={'f': background_mass}), how='left', sort=False).fillna(value=0.0) dbr = sort_by_cycles(dbr, integration_num, 2) dbr['is_outlier'] = False # need to treat backgrounds differently if clumped 13C, or D b_blocks = dbr['block'].unique() # first, filer for signal stability per sample block dbr = filter_for_signal_stability(dbr, integration_num, sigma_filter=3, count_backwards=30, signals_to_use=['i16'], b_block=b_blocks[-1]) # then again per sample or std dbr = filter_for_signal_stability_per_side(dbr, sigma_filter=2, count_backwards=2, signals_to_use=['i16'], b_block=b_blocks[-1]) # if more than one bakcground block if len(b_blocks) > 1: dbr = filter_for_signal_stability(dbr, integration_num, sigma_filter=3, count_backwards=30, signals_to_use=['i16'], b_block=b_blocks[-2]) dbr = filter_for_signal_stability_per_side(dbr, sigma_filter=10, count_backwards=2, signals_to_use=['i16'], b_block=b_blocks[-2]) # i16 background comes from last background block dbg_16 = dbr.loc[(dbr['block'] == b_blocks[-1]) & ( dbr['is_outlier'] == False), :].groupby('is_sample') # i17 and i18 will come from previous dbg = dbr.loc[(dbr['block'] == b_blocks[-1]) & ( dbr['is_outlier'] == False), :].groupby('is_sample') # else, everybody comes from same one else: dbg_16 = dbr.loc[(dbr['block'] == b_blocks[-1]) & ( dbr['is_outlier'] == False), :].groupby('is_sample') dbg = dbr.loc[(dbr['block'] == b_blocks[-1]) & ( dbr['is_outlier'] == False), :].groupby('is_sample') # now, apply mass 16 backgrounds # first, copy old dr['i16_raw'] = dr['i16'].copy() # check if backgrounds are significantly different at 3-sigma level: if np.abs(dbg_16['i16'].mean()[1] - dbg_16['i16'].mean()[0]) > ( 3*dbg_16['i16'].std()/np.sqrt(dbg_16['i16'].count())).sum(): print(('Backgrounds on {0} are significantly different outside of' 'uncertainty \n std: {1:.3f}, sa: {2:.3f}, pm {3:.3f}' ).format( '12CH4', dbg_16['i16'].mean()[0], dbg_16['i16'].mean()[1], (3*dbg_16['i16'].std()/np.sqrt( dbg_16['i16'].count())).sum())) bckgrnd_choice = input(('Apply (s)ame, (d)ifferent, ' 'or (m)anual background corrections? ' )).lower() # then, apply to sample and standard separately if bckgrnd_choice == 'd': dr.loc[dr['is_sample'] == False, 'i16'] = dr.loc[ dr['is_sample'] == False, 'i16_raw'] - dbg_16[ 'i16'].mean()[0] dr.loc[dr['is_sample'] == True, 'i16'] = dr.loc[ dr['is_sample'] == True, 'i16_raw'] - dbg_16[ 'i16'].mean()[1] elif bckgrnd_choice == 'm': i16_bg_input = input('Input new background for mass 16... ' ).strip() dr.loc[dr['is_sample'] == False, 'i16'] = dr.loc[ dr['is_sample'] == False, 'i16_raw'] - float( i16_bg_input) dr.loc[dr['is_sample'] == True, 'i16'] = dr.loc[ dr['is_sample'] == True, 'i16_raw'] - float( i16_bg_input) else: # otherwise, apply the same to both dr.loc[dr['is_sample'] == False, 'i16'] = dr.loc[ dr['is_sample'] == False, 'i16_raw'] - dbg_16[ 'i16'].mean().mean() dr.loc[dr['is_sample'] == True, 'i16'] = dr.loc[ dr['is_sample'] == True, 'i16_raw'] - dbg_16[ 'i16'].mean().mean() else: # otherwise, apply the same to both dr.loc[dr['is_sample'] == False, 'i16'] = dr.loc[ dr['is_sample'] == False, 'i16_raw'] - dbg_16[ 'i16'].mean().mean() dr.loc[dr['is_sample'] == True,'i16'] = dr.loc[ dr['is_sample'] == True, 'i16_raw'] - dbg_16[ 'i16'].mean().mean() while True: try: dbr.to_excel('backgrounds_clump.xlsx') break except(PermissionError): close_sheet = input(('Spreadsheet: backgrounds_clump.xlsx ' 'is open. \n Close it and press ENTER ' 'to continue... ')) continue while True: try: dbg.mean()[['i16', 'i17', 'i18']].to_excel( 'backgrounds_mean.xlsx') break except(PermissionError): close_sheet = input(('Spreadsheet: backgrounds_mean.xlsx ' 'is open. \n Close it and press ENTER ' 'to continue... ')) continue d.rename(columns={0: 'meas_line'}, inplace=True) for peak in d['peak'].unique(): this_peak = 'i_' + peak # one-line to find peaks and merge, with correct name # and fill nas as 0.0 dr = dr.join(d.loc[d.peak == peak, ['meas_line', 'f']].set_index( 'meas_line').rename(columns={'f': this_peak}), how='left', sort=False).fillna(value=0.0) if peak in ['13CH4', '12CH3D', '13CH34']: dr['i17_raw'] = dr[this_peak] dr['i17'] = dr['i17_raw'].copy() # if background scans processed: if db.size > 0: # first, check if backgrounds are significantly different at # 3-sigma level: if np.abs(dbg['i17'].mean()[1] - dbg['i17'].mean()[0]) > ( 3*dbg['i17'].std()/np.sqrt(dbg['i17'].count())).sum(): print(('Backgrounds on {0} are significantly different ' 'outside of uncertainty \n std: {1:.3f}, sa: ' '{2:.3f}, pm {3:.3f}').format( peak, dbg['i17'].mean()[0], dbg['i17'].mean()[1] (3*dbg['i17'].std()/np.sqrt( dbg['i17'].count())).sum())) i17_bg_input = input(('Input new background for i17 or ' 'press Enter to use ' 'existing ones...')).strip() if len(i17_bg_input) > 0: dr.loc[dr['is_sample'] == False, 'i17'] = dr.loc[ dr['is_sample'] == False, 'i17'] - float( i17_bg_input) dr.loc[dr['is_sample'] == True, 'i17'] = dr.loc[ dr['is_sample'] == True, 'i17'] - float( i17_bg_input) else: # then, apply to sample and standard separately dr.loc[dr['is_sample'] == False, 'i17'] = dr.loc[ dr['is_sample'] == False, 'i17'] - dbg[ 'i17'].mean()[0] dr.loc[dr['is_sample'] == True, 'i17'] = dr.loc[ dr['is_sample'] == True, 'i17'] - dbg[ 'i17'].mean()[1] else: dr.loc[dr['is_sample'] == False, 'i17'] = dr.loc[ dr['is_sample'] == False, 'i17'] - dbg[ 'i17'].mean().mean() dr.loc[dr['is_sample'] == True, 'i17'] = dr.loc[ dr['is_sample'] == True, 'i17'] - dbg[ 'i17'].mean().mean() dr['R17'] = dr['i17']/dr['i16'] elif peak in ['12CH2D2', '13CH3D']: dr['i18_raw'] = dr[this_peak] dr['i18'] = dr['i18_raw'].copy() # if background scans processed: if db.size > 0: if peak == '13CH3D': # first, check if backgrounds are significantly different # at 3sigma level: if np.abs(dbg['i18'].mean()[1] - dbg['i18'].mean()[0]) > ( 3*dbg['i18'].std()/np.sqrt(dbg['i18'].count()) ).sum(): print(('Backgrounds on {0} are significantly different' ' outside of uncertainty \n std: {1:.3f}, sa:' '{2:.3f}, pm {3:.3f}').format( peak, dbg['i18'].mean()[0], dbg['i18'].mean()[1], ( 3*dbg['i18'].std()/np.sqrt( dbg['i18'].count())).sum())) bckgrnd_choice = input(('Apply (s)ame, (d)ifferent, or' ' (m)anual background ' ' corrections? ')).lower() if bckgrnd_choice == 'd': # then, apply to sample and standard separately dr.loc[dr['is_sample'] == False, 'i18'] = dr.loc[ dr['is_sample'] == False, 'i18'] - dbg[ 'i18'].mean()[0] dr.loc[dr['is_sample'] == True, 'i18'] = dr.loc[ dr['is_sample'] == True, 'i18'] - dbg[ 'i18'].mean()[1] elif bckgrnd_choice == 'm': i18_bg_input = input( 'Input new background for i18... ').strip() dr.loc[dr['is_sample'] == False, 'i18'] = dr.loc[ dr['is_sample'] == False, 'i18'] - float( i18_bg_input) dr.loc[dr['is_sample'] == True, 'i18'] = dr.loc[ dr['is_sample'] == True, 'i18'] - float( i18_bg_input) else: dr.loc[dr['is_sample'] == False, 'i18'] = dr.loc[ dr['is_sample'] == False, 'i18'] - dbg[ 'i18'].mean().mean() dr.loc[dr['is_sample'] == True, 'i18'] = dr.loc[ dr['is_sample'] == True, 'i18'] - dbg[ 'i18'].mean().mean() else: dr.loc[dr['is_sample'] == False, 'i18'] = dr.loc[ dr['is_sample'] == False, 'i18'] - dbg[ 'i18'].mean().mean() dr.loc[dr['is_sample'] == True, 'i18'] = dr.loc[ dr['is_sample'] == True, 'i18'] - dbg[ 'i18'].mean().mean() if peak =='12CH2D2': if input_tail_D2_background: print(('Measured scattered ion background on 12CH2D2 is: ' '{0:.3f} cps').format( dbg['i18'].mean().mean())) D2_tail_background_wg = input(('Input total background for' ' 12CH2D2 on WG \n or press' ' ENTER to continue... ' )).strip() D2_tail_background_sample = input(( 'Input total background for 12CH2D2 on SAMPLE \n ' 'or press ENTER to continue...')).strip() if len(D2_tail_background_wg) > 0: if len(D2_tail_background_sample) == 0: D2_tail_background_sample = D2_tail_background_wg print(('Applying backgrounds of {0:.3f} and {1:.3f} ' 'cps to wg and sample 12CH2D2 peaks, ' 'respectively').format( float(D2_tail_background_wg), float(D2_tail_background_sample))) # if tail background added, # apply this to voltages on both sides dr.loc[dr['is_sample'] == False, 'i18'] = dr.loc[ dr['is_sample'] == False, 'i18'] - float( D2_tail_background_wg) dr.loc[dr['is_sample'] == True,'i18'] = dr.loc[ dr['is_sample'] == True, 'i18'] - float( D2_tail_background_sample) dr['R18'] = dr['i18']/dr['i16'] dr['integration_time'] = integration_time # Calculate confidence interval expected given the number of observations. # I.e., sigma window in which all observations should fall proportional_confidence_interval = np.sqrt(2)*erfinv( 1.0 - 1.0/(integration_num*2.0)) dr = filter_for_outliers(dr, sigma_filter=sigma_filter) if db.size > 0: dr = filter_for_max_ratios( dr, sigma_filter=proportional_confidence_interval*2) else: dr = filter_for_max_ratios( dr, sigma_filter=proportional_confidence_interval*2) if 'frag13_i16' in dbr['peak'].unique(): frag_rate = calculate_k_factor(dr, dbr, dbg_16, plot_results=True) drm = calculate_deltas(dr) return(dr, drm, file_name) def calculate_k_factor(dr, dbr, dbg_16, plot_results=False): frags = dbr.loc[(dbr['peak'] == 'frag13_i16'), [ 'measure_line', 'is_sample', 'is_outlier', 'i16']].copy() peaks = dr.loc[(dr['measure_line'].max() - dr['measure_line'] < 5), [ 'measure_line', 'is_sample', 'is_outlier', 'i17']].copy() # divide up sample vs std frags.loc[frags['is_sample'] == True, 'i16_sa'] = frags.loc[ (frags['is_sample'] == True) & (frags['is_outlier'] == False), 'i16'] - dbg_16['i16'].mean()[1] frags.loc[frags['is_sample'] == False, 'i16_std'] = frags.loc[ (frags['is_sample'] == False) & (frags['is_outlier'] == False), 'i16'] - dbg_16['i16'].mean()[0] # same for i17 peaks.loc[peaks['is_sample'] == True, 'i17_sa'] = peaks.loc[ (peaks['is_sample'] == True) & (peaks['is_outlier'] == False), 'i17'] peaks.loc[peaks['is_sample'] == False, 'i17_std'] = peaks.loc[ (peaks['is_sample'] == False) & (peaks['is_outlier'] == False), 'i17'] # now, append frags to end of peaks. # Timing is such that this works out perfectly because # both peaks and frags took the same amount of time # this corrects for bleedout pf = peaks.append(frags, ignore_index=True, sort=False) if plot_results: fig, ax = plt.subplots(2, figsize=(8, 6)) for i in ['std', 'sa']: this_fit_17 = np.polyfit(pf['i17_' + i].dropna().index, pf['i17_' + i].dropna(), 1) pf['i17_fit_'+i] = this_fit_17[0]*pf.index + this_fit_17[1] pf['k_factor_' + i] = pf['i16_' + i] / pf['i17_fit_' + i] print('{0} frag rate = {1:.4f} +/- {2:.4f}'.format(i, pf['k_factor_' + i].mean(), pf['k_factor_' + i].std())) ax[0].plot(pf.index, pf['i17_' + i], '.', label='i17_' + i) ax[0].plot(pf.index, pf['i16_' + i], '.', label='i16_' + i) ax[0].plot( pf.index, pf['i17_fit_' + i], '--', label='i17_fit_' + i) ax[1].plot( pf.index, pf['k_factor_' + i], '.', label='k_factor_' + i) ax[1].set_xlabel('index') ax[0].set_ylabel('i (cps)') ax[1].set_ylabel('frag rate') ax[0].legend(loc='best') ax[1].legend(loc='best') fig.savefig('frag_rate_calcs.pdf') else: for i in ['std', 'sa']: this_fit_17 = np.polyfit(pf['i17_' + i].dropna().index, pf['i17_' + i].dropna(), 1) pf['i17_fit_'+i] = this_fit_17[0]*pf.index + this_fit_17[1] pf['k_factor_' + i] = pf['i16_'+i]/pf['i17_fit_' + i] print('{0} frag rate = {1:.4f} +/- {2:.4f}'.format(i, pf['k_factor_' + i].mean(), pf['k_factor_' + i].std())) k_factor_weighted = ( pf['k_factor_std'].mean()*pf['k_factor_std'].count() + pf['k_factor_sa'].mean()*pf['k_factor_sa'].count())/( pf['k_factor_std'].count() + pf['k_factor_sa'].count()) print('mean frag rate = {0:.4f}'.format(k_factor_weighted)) np.savetxt('frag_rate.txt', np.array([k_factor_weighted])) return(k_factor_weighted) def sort_by_cycles(dr, integration_num, cycle_num): dr['integration_number'] = ( dr['measure_line'] % integration_num).astype(np.int) dr['measure_line'] = ( dr['measure_line']/integration_num).apply(np.floor).astype(np.int) dr['cycle_number'] = ( dr['measure_line'] % (cycle_num*2 + 1)).astype(np.int) dr['acq_number'] = (dr['measure_line']/(cycle_num*2 + 1)).astype(np.int) dr['is_sample'] = dr['cycle_number'] % 2 return(dr) def filter_for_outliers(dr, sigma_filter=3): dg = dr.groupby('measure_line') drm = dr.set_index('measure_line') sigs_to_rd = {'i17': ('R17', 'd17'), 'i18': ('R18', 'd18')} for i in sigs_to_rd.keys(): if i in dr.columns: filter_ratio, d = sigs_to_rd[i] dr['is_outlier'] = ((np.abs( drm[filter_ratio]-dg[filter_ratio].mean() ))/dg[filter_ratio].std() > sigma_filter).reset_index()[filter_ratio].copy() dr[filter_ratio+'_unfiltered'] = dr[filter_ratio].copy() dr.loc[dr['is_outlier'], filter_ratio] = np.nan return(dr) def filter_for_max_ratios(dr, sigma_filter=4, dbr=[]): dg = dr.groupby('measure_line') drm = dr.set_index('measure_line') sigs_to_rd = {'i17': ('R17', 'd17'), 'i18': ('R18', 'd18')} for i in sigs_to_rd.keys(): if i in dr.columns: shot_noise_signal = i frf, d = sigs_to_rd[i] filter_ratio = frf+'_unfiltered' if len(dbr) == 0: johnson_noise = 2000 shot_noise_std_devs = np.sqrt(dg[shot_noise_signal].mean()*dg[ 'integration_time'].mean() + johnson_noise**2)/( dg['i16'].mean()) else: b_blocks = dbr['block'].unique() shot_noise_std_devs = np.sqrt(dg[shot_noise_signal].mean()*dg[ 'integration_time'].mean() + dbr.loc[ (dbr['is_outlier'] == False) & ( dbr['block'] == b_blocks[-1] ), shot_noise_signal].std()**2)/( dg['i16'].mean()) # use third-highest to buffer angainst strays dr['is_off_peak'] = ((dg[filter_ratio].apply( lambda x: x.sort_values().iloc[-3])-drm[filter_ratio]) / shot_noise_std_devs > sigma_filter ).reset_index()[0].copy() dr[frf+'_on_peak'] = dr[filter_ratio].copy() dr.loc[dr['is_off_peak'], frf + '_on_peak'] = np.nan return(dr) def filter_for_signal_stability(dbr, integration_num, sigma_filter=3, count_backwards=30, signals_to_use=['i16'], b_block=2): '''filters backgrounds based on last 30 integrations of each cycle due to pressure switching issues during first ones''' # group based on last 30 integrations in each cycle dbg_last = dbr.loc[(dbr['integration_number'] > ( integration_num-count_backwards)) & (dbr['block'] == b_block), : ].groupby('measure_line') dbr_m = dbr.loc[(dbr['block'] == b_block), :].set_index('measure_line') for i in signals_to_use: if i in dbr.columns: dbr.loc[dbr['block'] == b_block, 'is_outlier'] = ( (dbr_m[i] - dbg_last[i].mean())/dbg_last[i].std() > sigma_filter).reset_index().set_index(dbr.loc[ dbr['block'] == b_block].index)[i].copy() return(dbr) def filter_for_signal_stability_per_side(dbr, sigma_filter=3, count_backwards=2, signals_to_use=['i16'], b_block=3): '''filters backgrounds based on last 30 integrations of each cycle due to pressure switching issues during first ones''' # group based on last integration block on each side dbg_last = dbr.loc[(dbr['measure_line'] > (dbr.loc[ dbr.block == b_block, 'measure_line'].max()-count_backwards)) & ( dbr['block'] == b_block) & (dbr['is_outlier'] == False), : ].groupby('is_sample') for i in signals_to_use: if i in dbr.columns: for j in [0, 1]: dbr.loc[(dbr['block'] == b_block) & (dbr['is_sample'] == j) & ( dbr['is_outlier'] == False), 'is_outlier'] = ( dbr.loc[(dbr['block'] == b_block) & ( dbr['is_sample'] == j) & ( dbr['is_outlier'] == False ), i] - dbg_last[i].mean()[j] ) / dbg_last[i].std()[j] > sigma_filter return(dbr) def calculate_deltas(dr): sigs_to_rd = {'i17': ('R17', 'd17'), 'i18': ('R18', 'd18')} cols_needed = ['measure_line', 'cycle_number', 'acq_number', 'is_sample', 'integration_time', 'i16'] for i in sigs_to_rd.keys(): if i in dr.columns: r, d = sigs_to_rd[i] dr[r + '_std'] = dr.loc[dr['is_sample'] == 0, r] dr[r + '_sample'] = dr.loc[dr['is_sample'] == 1, r] cols_needed.extend([i, r + '_unfiltered', r + '_on_peak', r]) dg = dr.groupby('measure_line') drm = pd.DataFrame(data=dg[cols_needed].mean()) drm['P_imbalance'] = np.nan i_sample = drm.loc[drm['is_sample'] == 1, 'measure_line'].values drm['percent_on_peak'] = dr.loc[dr['is_off_peak'] == False].groupby( 'measure_line')['is_off_peak'].count()/dr.groupby( 'measure_line')['is_off_peak'].count()*100 drm.loc[i_sample, 'P_imbalance'] = (drm.loc[i_sample, 'i16']/( (drm.loc[i_sample - 1, 'i16'].values + drm.loc[i_sample + 1, 'i16'].values)/2)-1)*100 for i in sigs_to_rd.keys(): if i in dr.columns: r, d = sigs_to_rd[i] dr[r+'_std'] = dr.loc[dr['is_sample'] == 0, r] dr[r+'_sample'] = dr.loc[dr['is_sample'] == 1, r] drm[d] = np.nan drm[d+'_on_peak'] = np.nan drm[d+'_unfiltered'] = np.nan drm.loc[i_sample, d] = (drm.loc[i_sample, r]/(( drm.loc[i_sample-1, r].values + drm.loc[ i_sample+1, r].values)/2)-1)*1000 drm.loc[i_sample, d+'_unfiltered'] = (drm.loc[ i_sample, r+'_unfiltered']/((drm.loc[ i_sample-1, r+'_unfiltered'].values + drm.loc[ i_sample+1, r+'_unfiltered' ].values)/2)-1)*1000 drm.loc[i_sample, d+'_on_peak'] = (drm.loc[ i_sample, r+'_on_peak']/((drm.loc[ i_sample-1, r+'_on_peak'].values + drm.loc[ i_sample+1, r+'_on_peak'].values)/2)-1 )*1000 return(drm) def export_data(dr, drm, file_name, include_summary=True): cols_to_export = ['measure_line', 'block', 'meas', 'cycle_number', 'is_sample', 'integration_number', 'is_outlier', 'is_off_peak', 'i16_raw', 'i16'] sigs_to_rd = {'i17': ('R17', 'd17'), 'i18': ('R18', 'd18')} for i in sigs_to_rd.keys(): if i in dr.columns: r, d = sigs_to_rd[i] cols_to_export.extend([i+'_raw', i, r+'_unfiltered', r, r+'_on_peak', r+'_std', r+'_sample']) cols_to_export_all = cols_to_export.copy() dr.to_excel('{0}_processed_export_all.xlsx'.format(file_name), index=False, columns=cols_to_export_all, freeze_panes=(1, 0), header=True) return() def parse_log_file(): while True: log_file_name = input('drag excel log file... ').replace( '\\ ', ' ').strip("'").strip('"').strip() log_file_name = os.path.abspath(log_file_name) if os.path.exists(log_file_name) and log_file_name.endswith('.xlsx'): file_name = os.path.basename(log_file_name).split('.xlsx')[0] break else: print('Not an .xlsx file ') d = pd.read_excel(log_file_name, header=None, names=['Logged at', 'Level', 'Message', 'Time', 'Category', 'Sub Category']) p_adjust_start = 'Start Volume Adjust' p_adjust_stop = 'Pressure adjust finished' start = d.loc[d['Message'].str.contains(p_adjust_start), 'Time'] stop = d.loc[d['Message'].str.contains(p_adjust_stop), 'Time'] i_p_adjust = (d['Message'].str.contains(p_adjust_start) | d[ 'Message'].str.contains(p_adjust_stop)) del_t = stop.values-start.values i_peak_center = d['Message'].str.lower().str.contains('peak center') i_pc = d.loc[(i_peak_center) & (d['Level'].isin([ 'UserInfo', 'UserError'])), :].index i_pc_success = d.loc[(i_peak_center) & (d['Level'] == 'UserInfo'), :].index if len(i_pc_success) > 0: match_string = r'([0-9]+\.[0-9]+[E-]*[0-9]+)' mass_data = d.loc[i_pc_success, 'Message'].str.extractall( match_string).unstack()[0] center_mass = mass_data[0] delta_offset = mass_data[1] mass_offset = mass_data[2] d['center_mass'] = np.nan d.loc[center_mass.index, 'center_mass'] = center_mass.astype(np.float) d['delta_offset'] = np.nan d.loc[delta_offset.index, 'delta_offset'] = delta_offset.astype( np.float) d['mass_offset'] = np.nan d.loc[mass_offset.index, 'mass_offset'] = mass_offset.astype(np.float) d.loc[i_pc, ['Time', 'Message', 'center_mass', 'delta_offset', 'mass_offset']].to_excel( '{0}_peak_center_data.xlsx'.format(file_name)) d_pc = d.loc[i_pc, ['Time', 'Message', 'center_mass', 'delta_offset', 'mass_offset']] d.loc[i_p_adjust, ['Time', 'Message']].to_excel( '{0}_pressure_adjust_data.xlsx'.format(file_name)) return(d, d_pc) def plot_ratios(dr, file_name, sigma_filter=4, integration_time=0.27): fig, ax = plt.subplots(figsize=(10, 6)) ax.plot(dr.loc[dr['is_sample'] == 0].index, dr.loc[dr['is_sample'] == 0, 'R17_unfiltered'], 'o', alpha=0.5, color='C0') ax.plot(dr.loc[dr['is_sample'] == 0].index, dr.loc[dr['is_sample'] == 0, 'R17'], 's', color='C0') ax.plot(dr.loc[dr['is_sample'] == 1].index, dr.loc[dr['is_sample'] == 1, 'R17_unfiltered'], 'o', alpha=0.5, color='C1') ax.plot(dr.loc[dr['is_sample'] == 1].index, dr.loc[dr['is_sample'] == 1, 'R17'], 's', color='C1') for i in range(dr['cycle_number'].max()+1): shot_noise_st_devs = np.sqrt( dr.loc[dr['cycle_number'] == i, 'i17'].mean()*integration_time )/(dr.loc[dr['cycle_number'] == i, 'i16'].mean()*integration_time) ax.plot(dr.loc[dr['cycle_number'] == i].index, np.tile(dr.loc[dr['cycle_number'] == i, 'R17'].max(), dr.loc[dr['cycle_number'] == i].index.size), '-', color='C2') ax.plot(dr.loc[dr['cycle_number'] == i].index, np.tile(dr.loc[dr['cycle_number'] == i, 'R17'].max() - shot_noise_st_devs*sigma_filter, dr.loc[dr['cycle_number'] == i].index.size), '--', color='C2') ax.set_xlabel('number') ax.set_ylabel(r'$^{17}R$') fig.savefig('data.pdf') return def get_list_of_files_to_import(): acq_name = input('Drag all Qtegra files to process ').replace( '\\ ', ' ').strip("'").strip('"').strip() acq_name = acq_name.strip().strip("'").strip('"') acq_name_list = acq_name.split(' ') acq_name_list = [l.strip(',').strip("'") for l in acq_name_list] return(acq_name_list) drs = [] drms = [] file_names = [] acq_name_list = get_list_of_files_to_import() cycle_num, integration_time, integration_num = get_measurement_params() for i in acq_name_list: dr, drm, file_name = process_Qtegra_csv_file(i, sigma_filter=3, prompt_for_params=False, cycle_num=cycle_num, integration_time=integration_time, integration_num=integration_num, input_tail_D2_background=True) drs.append(dr) drms.append(drm) file_names.append(file_name) export_data(dr, drm, file_name, include_summary=True) if input('parse log file for {0}: (y/n)? '.format( file_name)).lower() == 'y': log_file, peak_centers = parse_log_file() drms[-1].insert(5, 'center_mass', peak_centers['center_mass'].values) drms[-1].insert(6, 'delta_offset', peak_centers['delta_offset'].values) drms[-1].insert(7, 'mass_offset', peak_centers['mass_offset'].values) drms[-1]['max_delta_offset'] = np.nan i_sample = drms[-1].loc[drms[-1]['is_sample'] == True].index drms[-1].loc[i_sample, 'max_delta_offset'] = np.stack([drms[-1].loc[ i_sample, 'delta_offset'].values, drms[-1].loc[ i_sample+1, 'delta_offset'].values]).max(axis=0) # consolidate all data frames dr_all = drs[0].copy() drm_all = drms[0].copy() for i in range(1, len(drs)): print(i) dr_all = dr_all.append(drs[i]) drm_all = drm_all.append(drms[i]) # re_index to preserve full order dr_all.reset_index(drop=False, inplace=True) drm_all.reset_index(drop=True, inplace=True) dr_all.to_excel('d_data_all.xlsx', freeze_panes=(1, 0), header=True) drm_all.to_excel('d_data_all_summary.xlsx', freeze_panes=(1, 0), header=True)