Skip to content

Incremented Load Flow#13

Open
AdityaKBhadragond14 wants to merge 9 commits intomainfrom
ab/incremented_load_flow
Open

Incremented Load Flow#13
AdityaKBhadragond14 wants to merge 9 commits intomainfrom
ab/incremented_load_flow

Conversation

@AdityaKBhadragond14
Copy link
Copy Markdown
Collaborator

@AdityaKBhadragond14 AdityaKBhadragond14 commented Mar 12, 2024

This PR fixes the incremental load flow in the ETL Pipeline.

Issues and fixes:

  1. Constraint Issue:
    a. fpk_visit_detail_preceding_visit_detail_id this constraint in the visit_detail table was causing problem because it links the visit_detail_id and the preceding_visit_detail_id in the visit_detail_id and causes problem while deleting or updating because the visit_detail id is being used some other entries because of the foreign key reference. To fix this issue added ON DELETE CASCADE to the constraint to make sure all the entries with the references will also be deleted along with that particular entry.
    b. fpk_visit_occurrence_care_site_id, fpk_visit_detail_visit_occurrence_id, fpk_condition_occurrence_visit_occurrence_id, fpk_drug_exposure_visit_occurrence_id, fpk_measurement_visit_occurrence_id, fpk_observation_visit_occurrence_id, fpk_provider_care_site_id same for this constraints as well in the care_site table.

  2. In the custom mappers OrganizationMapper, PractitionerMapper the incremental load flow was not being handled. Added the check to handle that as well.

  3. Also in the CareSiteRepository, VisitDetailRepository and ProviderRepository added queries to delete the entries using fhir_logical_id.

  4. Also removed the part of the code from ResourceOmopReferenceUtils.java class where the update was happening in the fhir gateway which is not required.

  5. In TaskConfiguration added the stepPractitioner and stepPractitionerRole to the incremental load flow.

.next(stepProcessConsent)
.next(stepProcessDiagnosticReport)
.next(stepProcessPractitioners)
.next(stepProcessPractitionerRole)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add step for Provenance resource as well.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The provenance step does not exist yet in the codebase. This can be done after merging the appointment mapper PR.

Copy link
Copy Markdown
Collaborator

@SatwikSShanbhag SatwikSShanbhag left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for good Explanation of PR
left few comments have a look

@@ -40,25 +40,26 @@ public void beforeStep(StepExecution stepExecution) {
if (bulkload.equals(Boolean.TRUE)) {
log.info("========= Preparing OMOP DB for BulkLoad =========");
truncateDb();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any reason to move this code inside BulkLoad as I can see even in incremental load we need these setters ?

Copy link
Copy Markdown
Collaborator Author

@AdityaKBhadragond14 AdityaKBhadragond14 Apr 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have not moved anything here. Its like this from beginning.

Comment on lines 590 to +592
.next(stepProcessDiagnosticReport)
.next(stepProcessPractitioners)
.next(stepProcessPractitionerRole)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.next(stepProcessDiagnosticReport)
.next(stepProcessPractitioners)
.next(stepProcessPractitionerRole)
.next(stepProcessDiagnosticReport)
.next(stepProcessPractitioners)
.next(stepProcessPractitionerRole)

Comment on lines +102 to +105
ResourceFhirReferenceUtils referenceUtils,
IFhirPath fhirPath,
Boolean bulkload,
DbMappings dbMappings, OmopRepository repositories) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ResourceFhirReferenceUtils referenceUtils,
IFhirPath fhirPath,
Boolean bulkload,
DbMappings dbMappings, OmopRepository repositories) {
ResourceFhirReferenceUtils referenceUtils,
IFhirPath fhirPath,
Boolean bulkload,
DbMappings dbMappings,
OmopRepository repositories) {

Comment on lines +60 to +61
private final OmopRepository repositories;

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use EncounterDepartmentCaseMapperServiceImpl to manage repository operations.

String departmentCaseLogicId, String departmentCaseLogicIdentifier) {
if (!Strings.isNullOrEmpty(departmentCaseLogicId)) {
departmentCaseMapperService.deleteExistingDepartmentcaseByLogicalId(departmentCaseLogicId);
repositories.getVisitDetailRepository().deleteEntriesByFhirLogicalId(departmentCaseLogicId);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

define a delete function inside EncounterDepartmentCaseMapperServiceImpl and use this operation there

private final Boolean bulkload;
private final DbMappings dbMappings;

private final OmopRepository repositories;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same create a serviceImpl class and add repository operations there

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay

Comment on lines +291 to +294
// readerTemplate.update(
// String.format("UPDATE %s SET last_updated_at=? WHERE fhir_id=?", inputTableName),
// LocalDateTime.now().plusDays(1),
// resourceId.substring(4));
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please explain more on what was happening before and why is this now commented?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad I had commented it out by mistake.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

according to discussion we had this was commented before only. but here I can see we are commenting it
please explain more on this. pipeline id failing if we keep this piece of code.

Comment on lines +44 to +47
@Transactional
@Modifying
@Query(value = "DELETE FROM care_Site WHERE fhir_logical_id = :fhir_logical_id", nativeQuery = true)
void deleteCareSiteByLogicalId(@Param("fhir_logical_id") String fhirLogicalId);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can use directly void deleteByFhirLogicalId(String fhirLogicalId); no need to add query, try this approach .

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use void deleteByFhirLogicalId(String fhirLogicalId); its available by default

Comment on lines +53 to +55
@Transactional
@Modifying
@Query(value = "DELETE FROM visit_detail WHERE visit_detail_id NOT IN (select min(visit_detail_id) from cds_cdm.visit_detail where fhir_logical_id = :fhir_logical_id group by visit_detail_id)", nativeQuery = true)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any specific reason to delete rows which does not have given fhirLogicalId ?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This not required can use the method already provided. Removing it.

Comment on lines +20 to +23
@Transactional
@Modifying
@Query(value = "DELETE FROM provider WHERE fhir_logical_id = :fhir_logical_id", nativeQuery = true)
void deleteProviderByLogicId(@Param("fhir_logical_id") String fhirLogicalId);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same try this void deleteByFhirLogicalId(String fhirLogicalId);

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deleteByFhirLogicalId(String fhirLogicalId) method does not exist in the ProviderRepository that is why I have added this to delete the entry by fhirLogicalId.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its available by default just add and test

Copy link
Copy Markdown
Collaborator

@SatwikSShanbhag SatwikSShanbhag left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please have a look at comments. even some of comments from previous review not addressed properly.
thanks

Comment on lines +43 to +55
dbMappings
.getOmopConceptMapWrapper()
.setFindValidSnomedConcept(
repositories.getConceptRepository().findValidConceptId(VOCABULARY_SNOMED));
dbMappings.setFindSnomedRaceStandardMapping(
repositories.getSnomedRaceRepository().getSnomedRaceMap());
dbMappings
.getOmopConceptMapWrapper()
.setFindValidIPRDConcept(
repositories.getConceptRepository().findValidConceptId(VOCABULARY_IPRD));
dbMappings.getOmopConceptMapWrapper().setFindValidWHOConcept(
repositories.getConceptRepository().findValidConceptId(VOCABULARY_WHO)
);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any reason to move this code inside BulkLoad as I can see even in incremental load we need these setters ?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am moving these setters outside the block to do this for both bulkload and incremental load.

Comment on lines +60 to +61
private final OmopRepository repositories;

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also remove the variable if not used .

Comment on lines +105 to +106
DbMappings dbMappings,
OmopRepository repositories) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need to pass repository here remove it


this.bulkload = bulkload;
this.dbMappings = dbMappings;
this.repositories = repositories;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this assigning of variable

String departmentCaseLogicId, String departmentCaseLogicIdentifier) {
if (!Strings.isNullOrEmpty(departmentCaseLogicId)) {
departmentCaseMapperService.deleteExistingDepartmentcaseByLogicalId(departmentCaseLogicId);
departmentCaseMapperService.deleteExistingDepartmentCaseByFhirLogicalId(departmentCaseLogicId);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you use previous function why we required this new function we already have deleteExistingDepartmentcaseByLogicalId which does the same thing.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its just renamed reverting back to the previous name to avoid any confusion.


var quantityUnitCodingFormat =
new Coding().setCode(valueQuantity.getCode()).setSystem(valueQuantity.getSystem());
if (valueQuantity.hasSystem()){
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about units ? are they getting stored ?

Comment on lines -48 to -56
/**
* Delete FHIR Encounter resources from OMOP CDM tables using fhir_logical_id
*
* @param fhirLogicalId logical id of the FHIR Encounter resource
*/
public void deleteExistingDepartmentcaseByLogicalId(String fhirLogicalId) {
visitDetailRepository.deleteByFhirLogicalId(fhirLogicalId);
}

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this has been removed ? the new function which is added below is the same function with different name

Copy link
Copy Markdown
Collaborator Author

@AdityaKBhadragond14 AdityaKBhadragond14 Apr 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not being removed. I have just changed the function name reverting back to avoid confusion.

Comment on lines +44 to +47
@Transactional
@Modifying
@Query(value = "DELETE FROM care_Site WHERE fhir_logical_id = :fhir_logical_id", nativeQuery = true)
void deleteCareSiteByLogicalId(@Param("fhir_logical_id") String fhirLogicalId);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use void deleteByFhirLogicalId(String fhirLogicalId); its available by default

Comment on lines +20 to +23
@Transactional
@Modifying
@Query(value = "DELETE FROM provider WHERE fhir_logical_id = :fhir_logical_id", nativeQuery = true)
void deleteProviderByLogicId(@Param("fhir_logical_id") String fhirLogicalId);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its available by default just add and test

Comment on lines +291 to +294
// readerTemplate.update(
// String.format("UPDATE %s SET last_updated_at=? WHERE fhir_id=?", inputTableName),
// LocalDateTime.now().plusDays(1),
// resourceId.substring(4));
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

according to discussion we had this was commented before only. but here I can see we are commenting it
please explain more on this. pipeline id failing if we keep this piece of code.

@AdityaKBhadragond14
Copy link
Copy Markdown
Collaborator Author

  1. Adding units to the measurement table can be a different PR as it there needs to be some discussion done on this.
  2. I have commented the lines in the ResourceOmopReferenceUtils.java class because those are to do with some update in the fhirGatway jdbc which we does not require and will fail in our case because we dont use it.

Copy link
Copy Markdown
Collaborator

@SatwikSShanbhag SatwikSShanbhag left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants