Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 32 additions & 12 deletions dpnegf/negf/lead_property.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from dpnegf.negf.surface_green import selfEnergy
import logging
import os
import shutil
import tempfile
from dpnegf.utils.constants import Boltzmann, eV2J
import numpy as np
from dpnegf.negf.bloch import Bloch
Expand Down Expand Up @@ -592,7 +594,8 @@ def _get_safe_n_jobs(lead_L, lead_R, requested_n_jobs=-1, max_memory_fraction=0.


def compute_all_self_energy(eta, lead_L, lead_R, kpoints_grid, energy_grid,
self_energy_save_path=None, n_jobs=-1, batch_size=200):
self_energy_save_path=None, n_jobs=-1, batch_size=200,
parallel_backend="loky"):
"""
Computes and saves self-energy matrices for all combinations of k-points and energy values
for left and right leads.
Expand All @@ -618,6 +621,10 @@ def compute_all_self_energy(eta, lead_L, lead_R, kpoints_grid, energy_grid,
Number of parallel jobs to use. Default is -1 (use all available CPUs).
batch_size : int, optional
Number of (k, e) tasks per parallel batch. Default is 200.
parallel_backend : str, optional
Joblib backend used for self-energy calculation. Default is "loky"
to avoid pickling LeadProperty / TorchScript model objects when PyTorch
models are used.

Returns
-------
Expand All @@ -630,6 +637,10 @@ def compute_all_self_energy(eta, lead_L, lead_R, kpoints_grid, energy_grid,
"Self energy files will be saved in lead_L's results_path.")
self_energy_save_path = os.path.join(lead_L.results_path, "self_energy")

self_energy_save_path = os.path.abspath(self_energy_save_path)
os.makedirs(self_energy_save_path, exist_ok=True)
temp_dir = tempfile.mkdtemp(prefix="tmp_self_energy_", dir=self_energy_save_path)

# Calculate safe number of workers based on available memory
# Use first k-point for memory estimation
sample_kpoint = kpoints_grid[0] if len(kpoints_grid) > 0 else None
Expand All @@ -640,31 +651,40 @@ def compute_all_self_energy(eta, lead_L, lead_R, kpoints_grid, energy_grid,
log.info(f"Adjusted n_jobs from {n_jobs} to {safe_n_jobs} due to memory constraints")

total_tasks = [(k, e) for k in kpoints_grid for e in energy_grid]
log.info(f"Using joblib backend='{parallel_backend}' for self-energy calculation.")
if len(total_tasks) <= batch_size:
Parallel(n_jobs=safe_n_jobs, backend="loky")(
delayed(self_energy_worker)(k, e, eta, lead_L, lead_R, self_energy_save_path)
Parallel(n_jobs=safe_n_jobs, backend=parallel_backend)(
delayed(self_energy_worker)(k, e, eta, lead_L, lead_R, temp_dir)
for k, e in total_tasks
)
else:
for i in range(0, len(total_tasks), batch_size):
batch = total_tasks[i:i+batch_size]
Parallel(n_jobs=safe_n_jobs, backend="loky")(
delayed(self_energy_worker)(k, e, eta, lead_L, lead_R, self_energy_save_path)
Parallel(n_jobs=safe_n_jobs, backend=parallel_backend)(
delayed(self_energy_worker)(k, e, eta, lead_L, lead_R, temp_dir)
for k, e in batch
)


save_path_L = os.path.join(self_energy_save_path, "self_energy_leadL.h5")
save_path_R = os.path.join(self_energy_save_path, "self_energy_leadR.h5")

merge_hdf5_files(self_energy_save_path, save_path_L, pattern="tmp_leadL_*.h5")
merge_hdf5_files(self_energy_save_path, save_path_R, pattern="tmp_leadR_*.h5")
try:
merge_hdf5_files(temp_dir, save_path_L, pattern="tmp_leadL_*.h5")
merge_hdf5_files(temp_dir, save_path_R, pattern="tmp_leadR_*.h5")
except Exception as exc:
raise RuntimeError(
f"Failed to merge self-energy temporary files from '{temp_dir}'. "
"Temporary files were preserved for debugging."
) from exc

shutil.rmtree(temp_dir)





def self_energy_worker(k, e, eta, lead_L, lead_R, self_energy_save_path):
def self_energy_worker(k, e, eta, lead_L, lead_R, temp_dir):
"""
Calculates the self-energy for left and right leads at a given k-point and energy,
and saves the results to HDF5 files.
Expand All @@ -681,17 +701,17 @@ def self_energy_worker(k, e, eta, lead_L, lead_R, self_energy_save_path):
The left lead object, which must implement a `self_energy_cal` method.
lead_R : object
The right lead object, which must implement a `self_energy_cal` method.
self_energy_save_path : str
Directory path where the self-energy HDF5 files will be saved.
temp_dir : str
Directory path where the temporary self-energy HDF5 shard files will be saved.

Returns
-------
None
The function saves the calculated self-energies to files and does not return anything.
"""

save_tmp_L = os.path.join(self_energy_save_path, f"tmp_leadL_k{k[0]}_{k[1]}_{k[2]}_E{e:.8f}.h5")
save_tmp_R = os.path.join(self_energy_save_path, f"tmp_leadR_k{k[0]}_{k[1]}_{k[2]}_E{e:.8f}.h5")
save_tmp_L = os.path.join(temp_dir, f"tmp_leadL_k{k[0]}_{k[1]}_{k[2]}_E{e:.8f}.h5")
save_tmp_R = os.path.join(temp_dir, f"tmp_leadR_k{k[0]}_{k[1]}_{k[2]}_E{e:.8f}.h5")

seL = lead_L.self_energy_cal(kpoint=k, energy=e, eta_lead=eta)
seR = lead_R.self_energy_cal(kpoint=k, energy=e, eta_lead=eta)
Expand Down
35 changes: 31 additions & 4 deletions dpnegf/runner/NEGF.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,26 @@ def __init__(self,
self.kpoints,self.wk = kmesh_sampling_negf(self.stru_options["kmesh"],
self.stru_options["gamma_center"],
self.stru_options["time_reversal_symmetry"])

if 'override_overlap' in kwargs:
assert isinstance(kwargs['override_overlap'], str)
self.override_overlap = kwargs['override_overlap']
Comment on lines +113 to +115
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

override_overlap=False cannot pass through runner validation.

Line 114 asserts str, which rejects the explicit False disable behavior supported downstream.

Suggested fix
-        if 'override_overlap' in kwargs:
-            assert isinstance(kwargs['override_overlap'], str)
+        if 'override_overlap' in kwargs:
+            assert isinstance(kwargs['override_overlap'], (str, bool, type(None)))
             self.override_overlap = kwargs['override_overlap']
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if 'override_overlap' in kwargs:
assert isinstance(kwargs['override_overlap'], str)
self.override_overlap = kwargs['override_overlap']
if 'override_overlap' in kwargs:
assert isinstance(kwargs['override_overlap'], (str, bool, type(None)))
self.override_overlap = kwargs['override_overlap']
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@dpnegf/runner/NEGF.py` around lines 113 - 115, The validation currently
asserts override_overlap is a str which rejects the valid False boolean; update
the check in the __init__ (or where override_overlap is read) so it accepts
either False or a str (e.g. use isinstance(kwargs['override_overlap'], (str,
bool)) or explicitly allow kwargs['override_overlap'] is False), then assign
self.override_overlap = kwargs['override_overlap'] unchanged; reference the
override_overlap handling around the current assert and self.override_overlap
assignment.

log.info(msg="Using external calculated overlap overriding!")
else:
self.override_overlap = None

if 'parallel_options' in kwargs:
assert isinstance(kwargs['parallel_options'], dict)
self.parallel_options = kwargs['parallel_options']
log.info(msg="Read parallel options from input!")
else:
self.parallel_options = {
"n_job": -1,
"batch_size": 200,
"backend": "loky"
}
Comment on lines +125 to +129
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Default parallel options will crash with KeyError.

Line 126 defines n_job, but Lines 564 and 571 read n_jobs. If parallel_options is not provided, this path fails at runtime.

Suggested fix
         else:
             self.parallel_options = {
-                "n_job": -1,
+                "n_jobs": -1,
                 "batch_size": 200,
                 "backend": "loky"
             }

Also applies to: 564-565, 571-572

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@dpnegf/runner/NEGF.py` around lines 125 - 129, The default parallel_options
dict uses the wrong key "n_job" which causes KeyError when code later accesses
self.parallel_options["n_jobs"]; update the default to use the correct key name
"n_jobs" (i.e., set "n_jobs": -1) or alternatively change the later accesses to
read "n_job" consistently, but prefer changing the default so
self.parallel_options and all reads use "n_jobs"; ensure any other keys (e.g.,
"batch_size", "backend") remain unchanged.

log.info(msg="No parallel options Input, using default settings!")

log.info(msg="------ k-point for NEGF -----")
log.info(msg="Gamma Center: {0}".format(self.stru_options["gamma_center"]))
log.info(msg="Time Reversal: {0}".format(self.stru_options["time_reversal_symmetry"]))
Expand Down Expand Up @@ -168,7 +188,7 @@ def __init__(self,
e_fermi = {}; chemiPot = {}
# calculate Fermi level
if self.e_fermi is None:
elec_cal = ElecStruCal(model=model,device=self.torch_device)
elec_cal = ElecStruCal(model=model, device=self.torch_device, override_overlap=self.override_overlap)
nel_atom_lead = self.get_nel_atom_lead(
struct_leads,
charge={lead_tag: self.stru_options[lead_tag].get("charge", 0) for lead_tag in ["lead_L", "lead_R"]}
Expand Down Expand Up @@ -522,7 +542,8 @@ def prepare_self_energy(self, scf_require: bool) -> None:
# self energy calculation
log.info(msg="------Self-energy calculation------")
if self.self_energy_save_path is None:
self.self_energy_save_path = os.path.join(self.results_path, "self_energy")
self.self_energy_save_path = os.path.join(self.results_path, "self_energy")
self.self_energy_save_path = os.path.abspath(self.self_energy_save_path)
os.makedirs(self.self_energy_save_path, exist_ok=True)

if self.use_saved_se:
Expand All @@ -539,11 +560,17 @@ def prepare_self_energy(self, scf_require: bool) -> None:
# self.deviceprop.lead_L.self_energy(kpoint=k, energy=e, eta_lead=self.eta_lead, save=True)
# self.deviceprop.lead_R.self_energy(kpoint=k, energy=e, eta_lead=self.eta_lead, save=True)
compute_all_self_energy(self.eta_lead, self.deviceprop.lead_L, self.deviceprop.lead_R,
self.kpoints, self.density.integrate_range, self.self_energy_save_path)
self.kpoints, self.density.integrate_range, self.self_energy_save_path,
n_jobs=self.parallel_options["n_jobs"],
batch_size=self.parallel_options["batch_size"],
parallel_backend=self.parallel_options["backend"])
elif not self.scf:
# In non-scf case, the self-energy of the leads is calculated for each energy point in the energy grid.
compute_all_self_energy(self.eta_lead, self.deviceprop.lead_L, self.deviceprop.lead_R,
self.kpoints, self.uni_grid, self.self_energy_save_path)
self.kpoints, self.uni_grid, self.self_energy_save_path,
n_jobs=self.parallel_options["n_jobs"],
batch_size=self.parallel_options["batch_size"],
parallel_backend=self.parallel_options["backend"])
log.info(msg="-----------------------------------\n")


Expand Down
Loading
Loading