Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 41 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ Dependency notes:

2. If you'd like to train the model on GPU, make sure you install TensorFlow with GPU setup in your environment. More information can be found [here](https://www.tensorflow.org/install/pip#:~:text=4.-,Install%20TensorFlow,-TensorFlow%20requires%20a).

3. Currently waiting on `TesorFlow` to support Python 3.13 before we can support it as well.
3. Currently waiting on `TensorFlow` to support Python 3.13 before we can support it as well.

### Install Required Dependencies Only
For clean dependency management, use a virtual environment or a fresh Conda environment.
Expand All @@ -53,26 +53,47 @@ To uninstall the package, run:
pip uninstall das_anomaly
```

## Instructions
The main steps for using the package are as follows:
## Autoencoder Model Architecture

The package implements a convolutional autoencoder designed to compress and reconstruct power spectral density (PSD) inputs.

- **Encoder:** A lightweight convolutional neural network reduces the input dimensionality, mapping it into a compact latent space.
- **Decoder:** A symmetric decoder reconstructs the data by upsampling the latent representation back to the original resolution.

<p align="center">
<img src="docs/figures/AE-architecture.jpg" alt="Autoencoder architecture" width="1000">
</p>

---

## Usage Workflow

The overall workflow for using the package is illustrated below:

<p align="center">
<img src="docs/figures/AE-flowchart.jpg" alt="Workflow flowchart" width="900">
</p>

The main steps are:
1. Define constants and create a Spool of data:
Using the _config_user_ script in the das_anomaly directory, define the constants and directory paths for data, power spectral density (PSD) images, detected anomaly results, etc. You would complete adding the values as you go over the steps mentioned below.

Then, using DASCore, create an index file for the [spool](https://dascore.org/tutorial/spool.html) of data first time reading the DAS data directory:
Using the _config_user_ script in the das_anomaly directory, define the constants and directory paths for data, power spectral density (PSD) images, detected anomaly results, etc. You would complete adding the values as you go over the steps mentioned below. Then, using DASCore, create an index file for the [spool](https://dascore.org/tutorial/spool.html) of data first time reading the DAS data directory:

### Example
```python
import dascore as dc
from das_anomaly.settings import SETTINGS

data_path = SETTINGS.DATA_PATH

# Update will create an index of the contents for fast querying/access
# Update will create an index of the contents for fast querying/access. No need to apply update() in future.
spool = dc.spool(directory_path).update()
```
Note: Creating the spool for the first time may take some time if your directory contains hundreds of gigabytes or terabytes of DAS data. However, DASCore creates an index file, allowing it to quickly query the directory on subsequent accesses.

2. Set a consistent upper bound for PSD amplitude values:
To ensure all PSD images share the same colorbar scale, determine an appropriate CLIP_VALUE_MAX in the _config_user_ script. This can be done using the `get_psd_max_clip` function, which computes the mean value of maximum amplitude from TIME_WINDOWs of the data which does not include drastic anomalies (therefore, a quick exploratory data analysis is needed here.)

To ensure all PSD images share the same colorbar scale (in RGB), determine an appropriate CLIP_VALUE_MAX in the _config_user_ input file. This can be done using the `get_psd_max_clip` function, which computes the mean value of maximum amplitude from TIME_WINDOWs of the data which does not include obvious anomalies (therefore, a quick exploratory data analysis is needed here.)
### Example
```python
from das_anomaly.psd import PSDConfig, PSDGenerator
Expand All @@ -82,11 +103,12 @@ from das_anomaly.settings import SETTINGS
bn_data_path = SETTINGS.BN_DATA_PATH
cfg = PSDConfig(data_path=bn_data_path)
gen = PSDGenerator(cfg)
percentile = 90
percentile = 90 # data dependent
clip_val = gen.run_get_psd_val(percentile=percentile)
print(f"Mean {percentile}-percentile amplitude across all patches: {clip_val:.3e}")
```
3. Generate PSD plots:

Use the `das_anomaly.psd` module and create PSD plots in RGB format and in plain mode (with no axes or colorbar). The `das_anomaly.psd.PSDGenerator reads DAS data, creates a spool using DASCore library, applies a detrend function to each patch of the chunked spool, and then average the energy over a desired time window and stack all channels together to create a spatial PSD with channels on the X-axis and frequency on the Y-axis. You can use MPI to distribute reading data and plotting PSDs over CPUs.
### Example
```python
Expand All @@ -110,10 +132,12 @@ PSDGenerator(cfg).run()
PSDGenerator(cfg).run_parallel()
```
4. Select and copy known anomaly PSD plots:
From the generated PSD plots, identify and copy examples of known anomalies to the ANOMALY_IMAGES_PATH specified in the _config_user_ script. These anomalies can include events such as earthquakes from an existing catalog, seismic activity, instrument noise, anthropogenic disturbances, etc. Including these examples helps improve the accuracy of thresholding during the anomaly detection process.

From the generated PSD plots, identify and copy examples of known anomalies to the ANOMALY_IMAGES_PATH specified in the _config_user_ input script. These anomalies can include events such as earthquakes from an existing catalog, instrument noise, anthropogenic disturbances, etc. Including these examples helps improve thresholding during the detection process.

5. Train:
The `das_anomaly.train` module helps with randomly selecting train and test PSD images and training the model (with CPU or GPU) on anomaly-free PSD images. If you need to change model's architecture, you'll need to modify the `encoder` and `decoder` functions in the [utils.py](das_anomaly/utils.py).

The `das_anomaly.train` module helps with randomly selecting train and test PSD images and training the model (with CPU or GPU) on anomaly-free PSD images.
### Example
```python
from das_anomaly.settings import SETTINGS
Expand All @@ -127,13 +151,15 @@ ImageSplitter(cfg).run()
cfg = TrainAEConfig()
AutoencoderTrainer(cfg).run()
```
Note: Since the `TrainSplitConfig()` function randomly selects PSD images from the generated plots, you must ensure the training and testing datasets do not include anomalies. If you have an excel sheet with time stamp of anomalies, use "exclude_known_events_from_training" in examples directory to exclude them. Or, manually inspect both the training and testing sets to ensure they do not contain apparent anomalies. Review their time- and frequency-domain representations, and remove any suspicious samples to maintain the quality of training.
Note: Since the `TrainSplitConfig()` function randomly selects PSD images from the generated plots, you must ensure the training and testing datasets do not include obvious anomalies. If you have an excel sheet with time stamp of anomalies (such as a catalog), use the "exclude_known_events_from_training" in examples directory to exclude them. Or, manually inspect both the training and testing sets to ensure they do not contain apparent anomalies. Review their time- and frequency-domain representations, and remove any suspicious samples to maintain the quality of training.

6. Test and set thresholds:

6. Test and set a threshold:
Using the _validate_and_plot_density_ jupyter notebook in the examples directory, validate the trained model and find an appropriate density score as a threshold for anomaly detection. Then, make sure to modify the DENSITY_THRESHOLD parameter in the _config_user_ script.
Using the _validate_and_plot_density_ and _thresholding_f_score_ jupyter notebooks in the examples directory, validate the trained model and find appropriate MSE and density score as thresholds for anomaly detection. Make sure to modify the DENSITY_THRESHOLD and MSE_THRESHOLD parameters in the _config_user_ script.

7. Run the trained model:
The `das_anomaly.detect` module applies the trained model to the data to detect anomalies in the PSD images and writes their information. It also copies the detected anomaly to the RESULTS_PATH. MPI can be used to distribute PSDs over CPUs. Then, using the `das_anomaly.count` module, count the number of detected anomalies and display their details and file paths.

The `das_anomaly.detect` module uses the trained model to detect anomalies in the PSD images and writes their information (e.g., time stamp). It also copies the detected anomaly to the RESULTS_PATH. MPI can be used to distribute PSDs over CPUs. Then, using the `das_anomaly.count` module, count the number of detected anomalies and display their details and file paths.
### Example
```python
from das_anomaly.count.counter import CounterConfig, AnomalyCounter
Expand All @@ -145,7 +171,7 @@ AnomalyDetector(cfg).run()
# parallel processing with multiple processors using MPI:
AnomalyDetector(cfg).run_parallel()

# count number of anomalies
# count number of detected anomalies
cfg = CounterConfig(keyword="anomaly")
anomalies = AnomalyCounter(cfg).run()
print(anomalies) # prints info on number of anomalies and path to them
Expand Down
Binary file added docs/figures/AE-architecture.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/figures/AE-flowchart.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
18 changes: 18 additions & 0 deletions examples/bash_jobs/detect_mpi.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/bin/bash
#SBATCH --ntasks=24
#SBATCH -t 24:00:00
#SBATCH -A YOUR_ACCOUNT
#SBATCH --mem-per-cpu=128G

# Print start time
echo "Job started at: $(date)"

# Load modules and environment
module load openmpi/gcc/64/4.1.5
source activate dasanomaly

# Recommended in a SLURM environment: use srun, not mpirun
mpirun -n $SLURM_NTASKS python -u detect_parallel.py

# Print end time
echo "Job ended at: $(date)"
4 changes: 4 additions & 0 deletions examples/bash_jobs/detect_parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from das_anomaly.detect import AnomalyDetector, DetectConfig

cfg = DetectConfig()
AnomalyDetector(cfg).run_parallel()
22 changes: 22 additions & 0 deletions examples/bash_jobs/psd.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#!/bin/bash
#SBATCH -t 01:00:00
#SBATCH -A YOUR_ACCOUNT
#SBATCH --mem-per-cpu=16G

# Print start time
echo "Job started at: $(date)"

# Load modules and environment
source activate dasanomaly

# Run the script
python << EOF
from das_anomaly.psd import PSDConfig, PSDGenerator

cfg = PSDConfig()
# serial processing with single processor:
PSDGenerator(cfg).run()
EOF

# Print end time
echo "Job ended at: $(date)"
18 changes: 18 additions & 0 deletions examples/bash_jobs/psd_mpi.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/bin/bash
#SBATCH --ntasks=24
#SBATCH -t 6:00:00
#SBATCH -A YOUR_ACCOUNT
#SBATCH --mem-per-cpu=16G

# Print start time
echo "Job started at: $(date)"

# Load modules and environment
module load openmpi/gcc/64/4.1.5
source activate dasanomaly

# Recommended in a SLURM environment: use srun, not mpirun
mpirun -n $SLURM_NTASKS python -u psd_parallel.py

# Print end time
echo "Job ended at: $(date)"
5 changes: 5 additions & 0 deletions examples/bash_jobs/psd_parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from das_anomaly.psd import PSDConfig, PSDGenerator

cfg = PSDConfig()
# parallel processing with multiple processors using MPI:
PSDGenerator(cfg).run_parallel()
23 changes: 23 additions & 0 deletions examples/bash_jobs/train.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#!/bin/bash
#SBATCH -p h200_normal_q
#SBATCH --gres=gpu:1
#SBATCH --qos=tc_h200_normal_short
#SBATCH -t 1-00:00:00
#SBATCH -A YOUR_ACCOUNT

# Print start time
echo "Job started at: $(date)"

# Load modules and environment
source activate dasanomaly

# Run the script
python <<'PY'
from das_anomaly.train import TrainAEConfig, AutoencoderTrainer

cfg = TrainAEConfig()
AutoencoderTrainer(cfg).run()
PY

# Print end time
echo "Job ended at: $(date)"
162 changes: 162 additions & 0 deletions examples/plot_waterfall.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "ce378089",
"metadata": {},
"outputs": [],
"source": [
"import numpy as np\n",
"import pandas as pd\n",
"from pathlib import Path\n",
"\n",
"import dascore as dc\n",
"\n",
"from das_anomaly.settings import SETTINGS"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "fcf9eed5",
"metadata": {},
"outputs": [],
"source": [
"txt_path = SETTINGS.RESULTS_PATH\n",
"\n",
"waterfall_path = \"path/for/waterfall/plots\"\n",
"waterfall_dir = Path(waterfall_path)\n",
"waterfall_dir.mkdir(parents=True, exist_ok=True) # creates it if it doesn't exist\n",
"\n",
"data_path = SETTINGS.DATA_PATH\n",
"data_unit = SETTINGS.DATA_UNIT\n",
"\n",
"min_freq = SETTINGS.MIN_FREQ \n",
"max_freq = SETTINGS.MAX_FREQ \n",
"step_multiple = SETTINGS.STEP_MULTIPLE \n",
"start_channel = SETTINGS.START_CHANNEL \n",
"end_channel = SETTINGS.END_CHANNEL \n",
"time_window = SETTINGS.TIME_WINDOW\n",
"time_overlap = SETTINGS.TIME_OVERLAP "
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "9fb4b280",
"metadata": {},
"outputs": [],
"source": [
"# Read one path per line\n",
"s = pd.read_csv(txt_path, header=None, names=[\"path\"], dtype=str)\n",
"\n",
"# Extract timestamps\n",
"ts = s[\"path\"].str.extract(\n",
" r'(?P<time_min>\\d{4}_\\d{2}_\\d{2}T\\d{2}_\\d{2}_\\d{2})__'\n",
" r'(?P<time_max>\\d{4}_\\d{2}_\\d{2}T\\d{2}_\\d{2}_\\d{2})'\n",
")\n",
"\n",
"# Build DataFrame with 1-based row numbers\n",
"df = pd.concat([s.index.to_series().add(1).rename(\"rows\"), ts], axis=1)\n",
"\n",
"print(df.head())"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "00ce78d2",
"metadata": {},
"outputs": [],
"source": [
"def _distance_slice(patch):\n",
" \"\"\"Convert channel range to distance range.\"\"\"\n",
" start = patch.coords[\"distance\"][start_channel]\n",
" end = patch.coords[\"distance\"][end_channel]\n",
" return (start, end)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "368e1837",
"metadata": {},
"outputs": [],
"source": [
"sp = dc.spool(data_path)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "b53799e9",
"metadata": {},
"outputs": [],
"source": [
"\n",
"select_time = True\n",
"\n",
"for row in df.itertuples(index=True): # index=False if you don't need the index\n",
" i = row.Index # row number (DataFrame index)\n",
" t1 = pd.to_datetime(row.time_min, format=\"%Y_%m_%dT%H_%M_%S\") \n",
" t2 = pd.to_datetime(row.time_max, format=\"%Y_%m_%dT%H_%M_%S\")\n",
" \n",
" if i>1:\n",
" break\n",
"\n",
" if select_time:\n",
" sub_sp_time = sp.select(time=(t1, t2))\n",
" sub_sp_time_distance = sub_sp_time.select(distance=(_distance_slice(sub_sp_time[0])))\n",
" sub_sp = sub_sp_time_distance\n",
" else:\n",
" sub_sp_distance = sp.select(distance=(_distance_slice(sp[0]))) \n",
" sub_sp = sub_sp_distance\n",
" # chunk into windowed sub-patches\n",
" sub_sp_chunked = sub_sp.sort(\"time\").chunk(\n",
" time=time_window, keep_partial=True,\n",
" )\n",
" if len(sub_sp_chunked) == 0:\n",
" raise ValueError(\"No patch of DAS data found within data path: %s\")\n",
" # iterate over patches and perform preprocessing\n",
" for pa_num, patch in enumerate(sub_sp_chunked):\n",
" if data_unit == \"velocity\":\n",
" proc_patch = patch.velocity_to_strain_rate_edgeless(\n",
" step_multiple=step_multiple\n",
" ).detrend(\"time\")\n",
" elif data_unit == \"strain_rate\":\n",
" proc_patch = patch.detrend(\"time\")\n",
" else:\n",
" raise ValueError(\n",
" \"Unsupported data_unit: {data_unit!r}. \"\n",
" \"Expected 'velocity' or 'strain_rate'.\"\n",
" )\n",
" ax = proc_patch.viz.waterfall(show=False, scale=(-2.5e-6,2.5e-6))\n",
" fig = ax.get_figure() # or: fig = ax.figure\n",
" name = Path(s[\"path\"].iloc[i]).stem\n",
" fig.savefig(waterfall_dir / f\"{name}_{pa_num}.png\", dpi=300, bbox_inches=\"tight\")"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "dasanomaly",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.11"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Loading
Loading