Artificial Intelligence
Please fill the required field.

LS nippy type files


""" __slots__ = "_tuftable" def __init__(self, *args, **kwargs): """ Parameters ---------- kwargs: Can be used to pass partial list of of tuftables straight to the super().__init__ as implemented by xarray.Dataset """ super().__init__(*args, **kwargs) def properties(self): return self._tuftable @classmethod def open_store( cls, explorer_url:str=None, project:str=None, store_name:str=None, directory:str=None, basepath:str=None, data_store:Store=None ) -> Dataset: """ Open a store with the given explorer_url, project and store_name Parameters ---------- explorer_url project store_name directory """ return File(store_name).to_xarray() @classmethod def xropen(cls, *args, **kwargs) -> Dataset: config = get_aws_config() if 'use_remote_fs' in os.environ and os.environ['use_remote_fs'] == 'True': xrargs = {key: config[key] for key in ['anon', 'client_kwargs']} xrargs['endpoint_url'] = explorer_url else: os.environ['USE_LOCAL_S3'] = 'TRUE' xrargs = {} return xr.open_dataset(*args, **xrargs, **kwargs) @staticmethod def list_store(explorer_url: str = None, project: str = None, store_name: str = None, directory:str=None) -> list: def scan_data_path(directory: str) -> list: from urllib.request import urlopen if sys.version_info >= (3, 0): return [line.decode('utf-8').rstrip() for line in urlopen(f'{explorer_url}{directory}')] return [line.rstrip() for line in urlopen(f'{explorer_url}{directory}')] explorer_url = explorer_url if explorer_url else os.getenv('TUFTS_CATALOG_URL', "https://topoflowws01.cr.usgs.gov/tufts/catalog/") return sorted([current_col.split('/')[-1] for current_col in scan_data_path(f"{directory}/")]) def _add_store_name(self, store_name:str): """ Basic wrapper for super """ self.attrs['store'] = store_name self.attrs['project'] = store_name self.attrs['data_id'] = store_name self.attrs["processing_software"] = 'xarray' @classmethod def open(cls, store_name: str = None, explorer_url: str = None, project: str = None, directory:str = None) -> Dataset: xrd = super(cls, cls).open(explorer_url=explorer_url, project=project, store_name=store_name, directory=directory) xrd._add_store_name(store_name) return xrd @classmethod @with_s3fs() def from_file(cls, src:str)->Dataset: return xr.open_dataset(src) def to_netcdf(self, *args, **kwargs): """Call helper from xarray module - Use the xarray CapturingSerializer """ return xr.conventions.encode_dataset_coordinates(self).to_netcdf(*args, **kwargs) def to_rio(self, outname:str, *args, **kwargs): with rio.open(outname, "w", **_file_get_meta(self)) as dest: dest.write(self.values) return outname def cleanup(self): """Add the netcdf file to the local cleaning process""" return os.unlink(get_local_netcdf(self)) def __repr__(self): return wrap_super_repr(self) def read_gssha_block(name: str, path: str, report=False): """ Reads a Gssha block (i.e., `elev.chn`) Parameters ---------- name : str Name of unit in the block path : str Returns ------- data : np.ndarray of block """ # Handle # Why read 'read' why not just use open? file = read(path) # Find the beginning of the unit file.find(name=name, report=report) # Read the unit and return it as a numpy array data = file.block(name, order="1i", binary=False) # Convert the numpy array to a variable return {"decoder": unit_decoder(data, name.lower())} def tuflow_decoder(array, unit="m", itemsize=4): """ Simple function to custom decode tuflow data Note ----- - Prone to IndexError if the “subdtype” objects do not align. Type "|V4" corresponds to float32 (m=4) "|V8" corresponds to float64 (m=8) """ decoder = { "|V4": np.float32, "|V8": np.float64, "|S4": str, "|S8": str, "|S1": str, } shape = array.shape[0] return decoder[get_subdtype(str(array.dtype), array)[0]](array.ravel().tobytes()).reshape(shape, itemsize) # Read the grid file def read_gssha_grid(name: str, path: str, report=False, buffer=False): """ Reads a GSSHA Grid margin error Parameters ---------- name: str Name path: str Returns ------- data """ # Handle file = read(path) # Find the beginning of the unit file.find(name=name, report=report, buffer=buffer) # Read the grid and return it as a numpy array data = file.grid() # Convert the numpy array to a variable return {"decoder": tuflow_decoder(data), "chunks": (250, 250)} def read_gssha_time_series(datafile, units="m"): """ Read GSSHA Excel Based Time Series Functions ---------- - This function takes only file series names for label - See function `tuflow.timeseries.data_decode.decode_obj` for info regarding column names - User after looping thru all ab """ datacvt=(lambda x: list(t_string(x['Date'], None).ravel())) sheets = pd.ExcelFile(datafile).sheet_names data = pd.concat((pd.read_excel(datafile, options=dict(sheet_name=sh), index=0).apply(datacvt, 1).rename({'Date': sh}) for sh in sheets), 1) data.rename(columns=dict([(oldkey, newkey) for oldkey, newkey in zip(list(data.columns), list(data.iloc[0]))]), inplace=True) data0 = data.iloc[1:] data0.set_index([data0.Date.iloc[0]], inplace=True) return data0.iloc[1:]