diff --git a/talend_component/tRunTask/tRunTask_begin.javajet b/talend_component/tRunTask/tRunTask_begin.javajet index 522ddec..2dc187f 100644 --- a/talend_component/tRunTask/tRunTask_begin.javajet +++ b/talend_component/tRunTask/tRunTask_begin.javajet @@ -1,315 +1,338 @@ -<%@ jet - imports=" - org.talend.core.model.process.INode - org.talend.core.model.process.ElementParameterParser - org.talend.designer.codegen.config.CodeGeneratorArgument - java.util.List - java.util.Map - " -%> -<% - CodeGeneratorArgument codeGenArgument = (CodeGeneratorArgument) argument; - INode node = (INode) codeGenArgument.getArgument(); - String cid = node.getUniqueName(); - String mode = ElementParameterParser.getValue(node, "__MODE__"); - boolean useTaskName = "true".equals(ElementParameterParser.getValue(node, "__USE_TASK_NAME__")); - boolean dieOnError = "true".equals(ElementParameterParser.getValue(node, "__DIE_ON_ERROR__")); - boolean jobNameIsTaskName = "true".equals(ElementParameterParser.getValue(node, "__JOB_NAME_IS_TASK_NAME__")); - String taskName = ElementParameterParser.getValue(node, "__TASK_NAME__"); - String jobName = ElementParameterParser.getValue(node, "__PROCESS__"); - if (jobNameIsTaskName) { - taskName = "\"" + jobName + "\""; - } - String debug = ElementParameterParser.getValue(node, "__DEBUG__"); - String taskId = ElementParameterParser.getValue(node, "__TASK_ID__"); - String tacUrl = ElementParameterParser.getValue(node, "__TAC_URL__"); - String tacUser = ElementParameterParser.getValue(node, "__TAC_USER__"); - String tacPasswd = ElementParameterParser.getValue(node, "__TAC_PASSWD__"); - boolean asynchron = "true".equals(ElementParameterParser.getValue(node, "__ASYNCHRONOUS_RUN__")); - @SuppressWarnings("unchecked") - List> contextParams = (List>) ElementParameterParser.getObjectValue(node, "__CONTEXT_PARAMS__"); - String waitTimeUntilRunning = ElementParameterParser.getValue(node, "__WAIT_UNTIL_RUNNING_CHECK_CYCLE__"); - String timeoutUntilRunning = ElementParameterParser.getValue(node, "__WAIT_UNTIL_RUNNING_TIMEOUT__"); - String waitUntilFinished = ElementParameterParser.getValue(node, "__WAIT_UNTIL_FINISHED_CHECK_CYCLE__"); - boolean checkTaskRunning = "true".equals(ElementParameterParser.getValue(node, "__CHECK_TASK_NOT_RUNNING_SIMULTANEOUSLY__")); - @SuppressWarnings("unchecked") - List> listTasks = (List>) ElementParameterParser.getObjectValue(node, "__TASK_NOT_RUNNING_SIMULTANEOUSLY__"); - boolean waitForTaskEnd = "true".equals(ElementParameterParser.getValue(node, "__WAIT_UNTIL_END__")); - boolean allowPreparing = "true".equals(ElementParameterParser.getValue(node, "__ALLOW_GENERATING__")); - String maxRepetitions = ElementParameterParser.getValue(node, "__MAX_REPETITION__"); - String waitTimeBetweenRepetition = ElementParameterParser.getValue(node, "__WAIT_MILLIS_BETWEEN_REPETITION__"); -%> - de.cimt.talendcomp.tac.TACConnection conn_<%=cid%> = new de.cimt.talendcomp.tac.TACConnection(<%=tacUrl%>); - conn_<%=cid%>.setUser(<%=tacUser%>); - conn_<%=cid%>.setPassword(<%=tacPasswd%>); -<% if (maxRepetitions != null && maxRepetitions.trim().isEmpty() == false) { %> - conn_<%=cid%>.setMaxRepeats(<%=maxRepetitions%>); -<% } %> -<% if (waitTimeBetweenRepetition != null && waitTimeBetweenRepetition.trim().isEmpty() == false) { %> - conn_<%=cid%>.setRepeatWaitTime(<%=waitTimeBetweenRepetition%>); -<% } %> - try { - String taskId_<%=cid%> = null; -<% if (useTaskName) { %> - String label_<%=cid%> = <%=taskName.trim()%>; - { - // retrieve the taskId by task name - de.cimt.talendcomp.tac.GetTaskIdByName taskIdByName = new de.cimt.talendcomp.tac.GetTaskIdByName(conn_<%=cid%>); - taskIdByName.setDebug(<%=debug%>); - try { - taskId_<%=cid%> = taskIdByName.getTaskId(<%=taskName.trim()%>); - taskIdByName.info("Task " + <%=taskName.trim()%> + ": has id: " + taskId_<%=cid%>); - } catch (Exception e) { - globalMap.put("<%=cid%>_ERROR_MESSAGE", "Retrieve taskId failed: " + e.getMessage()); - throw e; - } - } -<% } else { %> - taskId_<%=cid%> = String.valueOf(<%=taskId%>); - String label_<%=cid%> = taskId_<%=cid%>; -<% } %> - globalMap.put("<%=cid%>_TASK_ID", taskId_<%=cid%>); -<% if ("status".equals(mode)) { %> - // check the last status - { - de.cimt.talendcomp.tac.GetTaskStatus status = new de.cimt.talendcomp.tac.GetTaskStatus(conn_<%=cid%>); - status.setDebug(<%=debug%>); - status.setTaskId(taskId_<%=cid%>); - try { - status.execute(); - globalMap.put("<%=cid%>_STATUS", status.getStatus()); - globalMap.put("<%=cid%>_ERROR_STATUS", status.getErrorStatus()); - globalMap.put("<%=cid%>_HAS_ERRORS", status.hasErrors()); - globalMap.put("<%=cid%>_IS_READY_TO_RUN", status.isReadyToRun()); - globalMap.put("<%=cid%>_IS_PREPARING", status.isPreparing()); - globalMap.put("<%=cid%>_IS_RUNNING", status.isRunning()); - } catch (Exception e) { - String message = "Get task status for task:" + label_<%=cid%> + " failed:" + e.getMessage(); - globalMap.put("<%=cid%>_ERROR_MESSAGE", message); - throw e; - } - } -<% } else if ("run".equals(mode)) { %> - // run a task - int restartTaskCounter = 0; - startRun: while (true) { // startRun block begin - // check if the task is ready to run - { - de.cimt.talendcomp.tac.GetTaskStatus status = new de.cimt.talendcomp.tac.GetTaskStatus(conn_<%=cid%>); - status.setDebug(<%=debug%>); - status.setTaskId(taskId_<%=cid%>); - status.execute(); - if (status.isReadyToRun() == false && status.isRunning() == false) { - // task has any other state than READY_TO_RUN and RUNNING or REQUESTING_TO_RUN - // this could mean it is not deployed or is generating or need a generate - String message = "Task " + label_<%=cid%> + " is not not ready to run. Status:" + status.getStatus() + " ErrorStatus:" + status.getErrorStatus(); -<% if (allowPreparing) { %> - if (status.isPreparing() && status.isInReadyState() == false) { - // task is preparing - status.info(status.getTimeAsString() + "# Task " + label_<%=cid%> + " is preparing."); - while (status.isPreparing() && status.isInReadyState() == false) { - Thread.sleep(10000); // wait 10s - status.execute(); // check again - } - if (status.isPreparing()) { - status.info(status.getTimeAsString() + "# Task " + label_<%=cid%> + " has finished a preparing step."); - } else { - status.info(status.getTimeAsString() + "# Task " + label_<%=cid%> + " has finished preparing."); - } - } -<% } else { %> - // the task is not ready to run - globalMap.put("<%=cid%>_ERROR_MESSAGE", message); - // we have to die here - throw new Exception(message); -<% } %> - } - globalMap.put("<%=cid%>_STATUS", status.getStatus()); - globalMap.put("<%=cid%>_ERROR_STATUS", status.getErrorStatus()); - globalMap.put("<%=cid%>_HAS_ERRORS", status.hasErrors()); - globalMap.put("<%=cid%>_IS_READY_TO_RUN", status.isReadyToRun()); - globalMap.put("<%=cid%>_IS_PREPARING", status.isPreparing()); - globalMap.put("<%=cid%>_IS_RUNNING", status.isRunning()); - } -<% if (checkTaskRunning || waitForTaskEnd) { %> - { - // wait until tasks are finished (or simply not running) - java.util.List listTasks = new java.util.ArrayList(); - de.cimt.talendcomp.tac.GetTaskIdByName taskIdByName = new de.cimt.talendcomp.tac.GetTaskIdByName(conn_<%=cid%>); - taskIdByName.setDebug(<%=debug%>); - de.cimt.talendcomp.tac.GetTaskStatus status = new de.cimt.talendcomp.tac.GetTaskStatus(conn_<%=cid%>); - status.setDebug(<%=debug%>); - try { - boolean wait = true; - boolean firstLoop = true; - while (wait) { - wait = false; - // Check if this task is not currently running - status.setTaskId(taskId_<%=cid%>); - status.execute(); - if (status.isRunning() || status.isReadyToRun() == false) { - if (firstLoop) { - firstLoop = false; - status.info("Task " + label_<%=cid%> + ": Own task is currently started or running or not ready. Wait for end of current processing."); - } - wait = true; - Thread.sleep(1000); - continue; // no need to check another task if our task is not ready - } - Thread.sleep(100); // wait a bit to avoid over running the TAC - // wait for possible more tasks -<% for (Map taskMap : listTasks) { - String taskToWait = taskMap.get("TASK_NAME"); - if (taskToWait != null && taskToWait.trim().isEmpty() == false) { - taskToWait = taskToWait.replace('\"',' ').trim(); %> - status.setTaskId(taskIdByName.getTaskId("<%=taskToWait%>")); - status.execute(); - if (status.isRunning()) { - wait = true; - Thread.sleep(1000); - continue; // one of the necessary task is currently running, no need to check another task - } - Thread.sleep(100); // wait a bit to avoid over running the TAC -<% } %> -<% } %> - } // end while - status.info("Task " + label_<%=cid%> + ": All precondition tasks are finished."); - } catch (Exception e) { - globalMap.put("<%=cid%>_ERROR_MESSAGE", "Wait for task failed: " + e.getMessage()); - throw e; - } - } -<% } %> - de.cimt.talendcomp.tac.RunTask runTask_<%=cid%> = new de.cimt.talendcomp.tac.RunTask(conn_<%=cid%>); - runTask_<%=cid%>.setDebug(<%=debug%>); - runTask_<%=cid%>.setTaskId(taskId_<%=cid%>); -<% for (Map contextParam : contextParams) { - String paramName = contextParam.get("NAME"); - String paramValue = contextParam.get("VALUE"); - if (paramName != null && paramName.trim().isEmpty() == false && paramValue != null && paramValue.trim().isEmpty() == false) { %> - runTask_<%=cid%>.addContextParam("<%=paramName.trim()%>", <%=paramValue.trim()%>); -<% } %> -<% } %> - runTask_<%=cid%>.setSynchronous(<%=(false == asynchron)%>); - long startTime = System.currentTimeMillis(); - long stopTime = 0; - try { - runTask_<%=cid%>.info(runTask_<%=cid%>.getTimeAsString() + "# Task " + label_<%=cid%> + " starting ..."); - runTask_<%=cid%>.execute(); // start the task - } catch (Exception e) { - if (runTask_<%=cid%>.needRestartWithCheckTaskStatus()) { - runTask_<%=cid%>.info("Task " + label_<%=cid%> + ": Task is still processing (not correclty recognised by the former status check) -> restart the check and run cycle."); - continue startRun; // with startRun loop - } - globalMap.put("<%=cid%>_ERROR_MESSAGE", e.getMessage()); - throw e; - } -<% if (asynchron) { %> - // simplyfied reliable request handling since Talend release 5.6 - int waitUntilFinished = <%=waitUntilFinished%>; - de.cimt.talendcomp.tac.GetTaskExecutionStatus tes = new de.cimt.talendcomp.tac.GetTaskExecutionStatus(conn_<%=cid%>); - tes.setDebug(<%=debug%>); - tes.setExecRequestId(runTask_<%=cid%>.getExecRequestId()); - tes.execute(); - // by MM @ FLE 2017-10-17 - while ( !tes.getExecBasicStatus().equals("OK") && - !tes.getExecBasicStatus().equals("ERROR") && - !tes.getExecBasicStatus().equals("KILLED")) { - if (tes.isPreparing()) { - System.out.println("Task is " + tes.getExecDetailedStatus()); - } - Thread.sleep(waitUntilFinished); - tes.execute(); - } - tes.info(tes.getTimeAsString() + "# Task " + label_<%=cid%> + ": status:" + tes.getExecBasicStatus()); - tes.info(tes.getTimeAsString() + "# Task " + label_<%=cid%> + ": detailedStatus:" + tes.getExecDetailedStatus()); - Integer returnCode = tes.getJobExitCode(); - tes.info(tes.getTimeAsString() + "# Task " + label_<%=cid%> + ": returnCode:" + returnCode); - globalMap.put("<%=cid%>_RETURN_CODE", returnCode); - // check the last status again - { - de.cimt.talendcomp.tac.GetTaskStatus status = new de.cimt.talendcomp.tac.GetTaskStatus(conn_<%=cid%>); - status.setDebug(<%=debug%>); - status.setTaskId(taskId_<%=cid%>); - try { - status.execute(); - globalMap.put("<%=cid%>_STATUS", status.getStatus()); - globalMap.put("<%=cid%>_ERROR_STATUS", status.getErrorStatus()); - globalMap.put("<%=cid%>_HAS_ERRORS", status.hasErrors()); - globalMap.put("<%=cid%>_IS_READY_TO_RUN", status.isReadyToRun()); - globalMap.put("<%=cid%>_IS_PREPARING", status.isPreparing()); - globalMap.put("<%=cid%>_IS_RUNNING", status.isRunning()); - } catch (Exception e) { - String message = "Get task status for task:" + label_<%=cid%> + " failed:" + e.getMessage(); - globalMap.put("<%=cid%>_ERROR_MESSAGE", message); - throw e; - } - } - // by MM @ FLE 2017-10-17 - if (tes.getExecBasicStatus().equals("KILLED")) { - throw new Exception("Child job running killed by user."); - } -<% if (dieOnError) { %> - if (returnCode != null && returnCode.intValue() != 0) { - errorCode = returnCode; - globalMap.put("<%=cid%>_ERROR_MESSAGE", "Task " + label_<%=cid%> + ": Execute task failed. ErrorCode:" + errorCode); - throw new Exception("Child job running failed. ErrorCode:" + errorCode); - } -<% } %> -<% } else { // if (asynchron) %> - stopTime = System.currentTimeMillis(); - // simplyfied reliable request handling since Talend release 5.6 - int waitUntilFinished = <%=waitUntilFinished%>; - de.cimt.talendcomp.tac.GetTaskExecutionStatus tes = new de.cimt.talendcomp.tac.GetTaskExecutionStatus(conn_<%=cid%>); - tes.setDebug(<%=debug%>); - tes.setExecRequestId(runTask_<%=cid%>.getExecRequestId()); - tes.execute(); - tes.info(tes.getTimeAsString() + "# Task " + label_<%=cid%> + ": status:" + tes.getExecBasicStatus()); - tes.info(tes.getTimeAsString() + "# Task " + label_<%=cid%> + ": detailedStatus:" + tes.getExecDetailedStatus()); - Integer returnCode = tes.getJobExitCode(); - tes.info(tes.getTimeAsString() + "# Task " + label_<%=cid%> + ": returnCode:" + returnCode); - globalMap.put("<%=cid%>_RETURN_CODE", returnCode); - // check the last status again - { - de.cimt.talendcomp.tac.GetTaskStatus status = new de.cimt.talendcomp.tac.GetTaskStatus(conn_<%=cid%>); - status.setDebug(<%=debug%>); - status.setTaskId(taskId_<%=cid%>); - try { - status.execute(); - globalMap.put("<%=cid%>_STATUS", status.getStatus()); - globalMap.put("<%=cid%>_ERROR_STATUS", status.getErrorStatus()); - globalMap.put("<%=cid%>_HAS_ERRORS", status.hasErrors()); - globalMap.put("<%=cid%>_IS_READY_TO_RUN", status.isReadyToRun()); - globalMap.put("<%=cid%>_IS_PREPARING", status.isPreparing()); - globalMap.put("<%=cid%>_IS_RUNNING", status.isRunning()); - } catch (Exception e) { - String message = "Get task status for task:" + label_<%=cid%> + " failed:" + e.getMessage(); - globalMap.put("<%=cid%>_ERROR_MESSAGE", message); - throw e; - } - } - // by MM @ FLE 2017-10-17 - if (tes.getExecBasicStatus().equals("KILLED")) { - throw new Exception("Child job running killed by user."); - } -<% if (dieOnError) { %> - if (returnCode != null && returnCode.intValue() != 0) { - errorCode = returnCode; - globalMap.put("<%=cid%>_ERROR_MESSAGE", "Task " + label_<%=cid%> + ": Execute task failed. ErrorCode:" + errorCode); - throw new Exception("Child job running failed. ErrorCode:" + errorCode); - } -<% } %> -<% } // if (asynchron) %> - runTask_<%=cid%>.info("Task " + label_<%=cid%> + ": ready."); - if (stopTime > 0) { // set duration only if we have wait until the end - globalMap.put("<%=cid%>_RUN_DURATION", (stopTime - startTime)); - } - break; - } // startRun block end -<% } else { // mode unknown %> - // unkown mode detected - if (true) throw new IllegalStateException("Operational mode: <%=mode%> is unknown!"); -<% } %> - } finally { - conn_<%=cid%>.close(); - } +<%@ jet + imports=" + org.talend.core.model.process.INode + org.talend.core.model.process.ElementParameterParser + org.talend.designer.codegen.config.CodeGeneratorArgument + java.util.List + java.util.Map + " +%> +<% + CodeGeneratorArgument codeGenArgument = (CodeGeneratorArgument) argument; + INode node = (INode) codeGenArgument.getArgument(); + String cid = node.getUniqueName(); + String mode = ElementParameterParser.getValue(node, "__MODE__"); + boolean useTaskName = "true".equals(ElementParameterParser.getValue(node, "__USE_TASK_NAME__")); + boolean dieOnError = "true".equals(ElementParameterParser.getValue(node, "__DIE_ON_ERROR__")); + boolean jobNameIsTaskName = "true".equals(ElementParameterParser.getValue(node, "__JOB_NAME_IS_TASK_NAME__")); + String taskName = ElementParameterParser.getValue(node, "__TASK_NAME__"); + String jobName = ElementParameterParser.getValue(node, "__PROCESS__"); + if (jobNameIsTaskName) { + taskName = "\"" + jobName + "\""; + } + String debug = ElementParameterParser.getValue(node, "__DEBUG__"); + String taskId = ElementParameterParser.getValue(node, "__TASK_ID__"); + String tacUrl = ElementParameterParser.getValue(node, "__TAC_URL__"); + String tacUser = ElementParameterParser.getValue(node, "__TAC_USER__"); + String tacPasswd = ElementParameterParser.getValue(node, "__TAC_PASSWD__"); + boolean asynchron = "true".equals(ElementParameterParser.getValue(node, "__ASYNCHRONOUS_RUN__")); + @SuppressWarnings("unchecked") + List> contextParams = (List>) ElementParameterParser.getObjectValue(node, "__CONTEXT_PARAMS__"); + String waitTimeUntilRunning = ElementParameterParser.getValue(node, "__WAIT_UNTIL_RUNNING_CHECK_CYCLE__"); + String timeoutUntilRunning = ElementParameterParser.getValue(node, "__WAIT_UNTIL_RUNNING_TIMEOUT__"); + String waitUntilFinished = ElementParameterParser.getValue(node, "__WAIT_UNTIL_FINISHED_CHECK_CYCLE__"); + boolean checkTaskRunning = "true".equals(ElementParameterParser.getValue(node, "__CHECK_TASK_NOT_RUNNING_SIMULTANEOUSLY__")); + @SuppressWarnings("unchecked") + List> listTasks = (List>) ElementParameterParser.getObjectValue(node, (useTaskName ? "__TASK_NOT_RUNNING_SIMULTANEOUSLY__" : "__TASK_NOT_RUNNING_SIMULTANEOUSLY_IDS__")); + boolean waitForTaskEnd = "true".equals(ElementParameterParser.getValue(node, "__WAIT_UNTIL_END__")); + boolean allowPreparing = "true".equals(ElementParameterParser.getValue(node, "__ALLOW_GENERATING__")); + String maxRepetitions = ElementParameterParser.getValue(node, "__MAX_REPETITION__"); + String waitTimeBetweenRepetition = ElementParameterParser.getValue(node, "__WAIT_MILLIS_BETWEEN_REPETITION__"); +%> + de.cimt.talendcomp.tac.TACConnection conn_<%=cid%> = new de.cimt.talendcomp.tac.TACConnection(<%=tacUrl%>); + conn_<%=cid%>.setUser(<%=tacUser%>); + conn_<%=cid%>.setPassword(<%=tacPasswd%>); +<% if (maxRepetitions != null && maxRepetitions.trim().isEmpty() == false) { %> + conn_<%=cid%>.setMaxRepeats(<%=maxRepetitions%>); +<% } %> +<% if (waitTimeBetweenRepetition != null && waitTimeBetweenRepetition.trim().isEmpty() == false) { %> + conn_<%=cid%>.setRepeatWaitTime(<%=waitTimeBetweenRepetition%>); +<% } %> + try { + String taskId_<%=cid%> = null; +<% if (useTaskName) { %> + String label_<%=cid%> = <%=taskName.trim()%>; + { + // retrieve the taskId by task name + de.cimt.talendcomp.tac.GetTaskIdByName taskIdByName = new de.cimt.talendcomp.tac.GetTaskIdByName(conn_<%=cid%>); + taskIdByName.setDebug(<%=debug%>); + try { + taskId_<%=cid%> = taskIdByName.getTaskId(<%=taskName.trim()%>); + taskIdByName.info("Task " + <%=taskName.trim()%> + ": has id: " + taskId_<%=cid%>); + } catch (Exception e) { + globalMap.put("<%=cid%>_ERROR_MESSAGE", "Retrieve taskId failed: " + e.getMessage()); + throw e; + } + } +<% } else { %> + taskId_<%=cid%> = String.valueOf(<%=taskId%>); + String label_<%=cid%> = taskId_<%=cid%>; +<% } %> + globalMap.put("<%=cid%>_TASK_ID", taskId_<%=cid%>); +<% if ("status".equals(mode)) { %> + // check the last status + { + de.cimt.talendcomp.tac.GetTaskStatus status = new de.cimt.talendcomp.tac.GetTaskStatus(conn_<%=cid%>); + status.setDebug(<%=debug%>); + status.setTaskId(taskId_<%=cid%>); + try { + status.execute(); + globalMap.put("<%=cid%>_STATUS", status.getStatus()); + globalMap.put("<%=cid%>_ERROR_STATUS", status.getErrorStatus()); + globalMap.put("<%=cid%>_HAS_ERRORS", status.hasErrors()); + globalMap.put("<%=cid%>_IS_READY_TO_RUN", status.isReadyToRun()); + globalMap.put("<%=cid%>_IS_PREPARING", status.isPreparing()); + globalMap.put("<%=cid%>_IS_RUNNING", status.isRunning()); + } catch (Exception e) { + String message = "Get task status for task:" + label_<%=cid%> + " failed:" + e.getMessage(); + globalMap.put("<%=cid%>_ERROR_MESSAGE", message); + throw e; + } + } +<% } else if ("run".equals(mode)) { %> + // run a task + int restartTaskCounter = 0; + startRun: while (true) { // startRun block begin + // check if the task is ready to run + { + de.cimt.talendcomp.tac.GetTaskStatus status = new de.cimt.talendcomp.tac.GetTaskStatus(conn_<%=cid%>); + status.setDebug(<%=debug%>); + status.setTaskId(taskId_<%=cid%>); + status.execute(); + if (status.isReadyToRun() == false && status.isRunning() == false) { + // task has any other state than READY_TO_RUN and RUNNING or REQUESTING_TO_RUN + // this could mean it is not deployed or is generating or need a generate + String message = "Task " + label_<%=cid%> + " is not not ready to run. Status:" + status.getStatus() + " ErrorStatus:" + status.getErrorStatus(); +<% if (allowPreparing) { %> + if (status.isPreparing() && status.isInReadyState() == false) { + // task is preparing + status.info(status.getTimeAsString() + "# Task " + label_<%=cid%> + " is preparing."); + while (status.isPreparing() && status.isInReadyState() == false) { + Thread.sleep(10000); // wait 10s + status.execute(); // check again + } + if (status.isPreparing()) { + status.info(status.getTimeAsString() + "# Task " + label_<%=cid%> + " has finished a preparing step."); + } else { + status.info(status.getTimeAsString() + "# Task " + label_<%=cid%> + " has finished preparing."); + } + } +<% } else { %> + // the task is not ready to run + globalMap.put("<%=cid%>_ERROR_MESSAGE", message); + // we have to die here + throw new Exception(message); +<% } %> + } + globalMap.put("<%=cid%>_STATUS", status.getStatus()); + globalMap.put("<%=cid%>_ERROR_STATUS", status.getErrorStatus()); + globalMap.put("<%=cid%>_HAS_ERRORS", status.hasErrors()); + globalMap.put("<%=cid%>_IS_READY_TO_RUN", status.isReadyToRun()); + globalMap.put("<%=cid%>_IS_PREPARING", status.isPreparing()); + globalMap.put("<%=cid%>_IS_RUNNING", status.isRunning()); + } +<% if (checkTaskRunning || waitForTaskEnd) { %> + { + // wait until tasks are finished (or simply not running) + java.util.List listTasks = new java.util.ArrayList(); + de.cimt.talendcomp.tac.GetTaskIdByName taskIdByName = new de.cimt.talendcomp.tac.GetTaskIdByName(conn_<%=cid%>); + taskIdByName.setDebug(<%=debug%>); + de.cimt.talendcomp.tac.GetTaskStatus status = new de.cimt.talendcomp.tac.GetTaskStatus(conn_<%=cid%>); + status.setDebug(<%=debug%>); + try { + boolean wait = true; + java.util.HashMap firstLoop = new java.util.HashMap(); + firstLoop.put(taskId_<%=cid%>, true); + while (wait) { + wait = false; + // Check if this task is not currently running + status.setTaskId(taskId_<%=cid%>); + status.execute(); + if (status.isRunning() || status.isReadyToRun() == false) { + if (firstLoop.get(taskId_<%=cid%>)) { + firstLoop.put(taskId_<%=cid%>, false); + status.info("Task " + label_<%=cid%> + ": Own task is currently started or running or not ready. Wait for end of current processing."); + } + wait = true; + Thread.sleep(1000); + continue; // no need to check another task if our task is not ready + } + Thread.sleep(100); // wait a bit to avoid over running the TAC + // wait for possible more tasks + String taskToWaitId = null; + String taskToWaitLabel = null; +<% for (Map taskMap : listTasks) { + String taskToWait = (useTaskName ? taskMap.get("TASK_NAME") : taskMap.get("TASK_ID")); + if (taskToWait != null && taskToWait.trim().isEmpty() == false) { + taskToWait = taskToWait.replace('\"',' ').trim(); %> + // by BG @ FLE +<% if (useTaskName) { %> + try { + taskToWaitId = taskIdByName.getTaskId("<%=taskToWait%>"); + taskIdByName.info("Task " + <%=taskToWait%> + ": has id: " + taskToWaitId); + } catch (Exception e) { + globalMap.put("<%=cid%>_ERROR_MESSAGE", "Retrieve taskId failed: " + e.getMessage()); + throw e; + } +<% } else { %> + taskToWaitId = String.valueOf(<%=taskToWait%>); +<% } %> + if (!firstLoop.containsKey(taskToWaitId)) { + firstLoop.put(taskToWaitId, true); + } + taskToWaitLabel = taskToWaitId; + status.setTaskId(taskToWaitId); + status.execute(); + if (status.isRunning() || status.isReadyToRun() == false) { + if (firstLoop.get(taskToWaitId)) { + firstLoop.put(taskToWaitId, false); + status.info("Precondition task " + taskToWaitLabel + " is currently started or running. Wait for its end."); + } + wait = true; + Thread.sleep(1000); + continue; // one of the necessary task is currently running, no need to check another task + } + Thread.sleep(100); // wait a bit to avoid over running the TAC +<% } %> +<% } // end for %> + } // end while + status.info("Task " + label_<%=cid%> + ": All precondition tasks are finished."); + } catch (Exception e) { + globalMap.put("<%=cid%>_ERROR_MESSAGE", "Wait for task failed: " + e.getMessage()); + throw e; + } + } +<% } %> + de.cimt.talendcomp.tac.RunTask runTask_<%=cid%> = new de.cimt.talendcomp.tac.RunTask(conn_<%=cid%>); + runTask_<%=cid%>.setDebug(<%=debug%>); + runTask_<%=cid%>.setTaskId(taskId_<%=cid%>); +<% for (Map contextParam : contextParams) { + String paramName = contextParam.get("NAME"); + String paramValue = contextParam.get("VALUE"); + if (paramName != null && paramName.trim().isEmpty() == false && paramValue != null && paramValue.trim().isEmpty() == false) { %> + runTask_<%=cid%>.addContextParam("<%=paramName.trim()%>", <%=paramValue.trim()%>); +<% } %> +<% } %> + runTask_<%=cid%>.setSynchronous(<%=(false == asynchron)%>); + long startTime = System.currentTimeMillis(); + long stopTime = 0; + try { + runTask_<%=cid%>.info(runTask_<%=cid%>.getTimeAsString() + "# Task " + label_<%=cid%> + " starting ..."); + runTask_<%=cid%>.execute(); // start the task + } catch (Exception e) { + if (runTask_<%=cid%>.needRestartWithCheckTaskStatus()) { + runTask_<%=cid%>.info("Task " + label_<%=cid%> + ": Task is still processing (not correclty recognised by the former status check) -> restart the check and run cycle."); + continue startRun; // with startRun loop + } + globalMap.put("<%=cid%>_ERROR_MESSAGE", e.getMessage()); + throw e; + } +<% if (asynchron) { %> + // simplyfied reliable request handling since Talend release 5.6 + int waitUntilFinished = <%=waitUntilFinished%>; + de.cimt.talendcomp.tac.GetTaskExecutionStatus tes = new de.cimt.talendcomp.tac.GetTaskExecutionStatus(conn_<%=cid%>); + tes.setDebug(<%=debug%>); + tes.setExecRequestId(runTask_<%=cid%>.getExecRequestId()); + tes.execute(); + // by MM @ FLE 2017-10-17 + while (!tes.getExecBasicStatus().equals("OK") + && !tes.getExecBasicStatus().equals("ERROR") + && !tes.getExecBasicStatus().equals("KILLED")) { + if (tes.isPreparing()) { + System.out.println("Task is " + tes.getExecDetailedStatus()); + } + Thread.sleep(waitUntilFinished); + tes.execute(); + } + tes.info(tes.getTimeAsString() + "# Task " + label_<%=cid%> + ": status:" + tes.getExecBasicStatus()); + tes.info(tes.getTimeAsString() + "# Task " + label_<%=cid%> + ": detailedStatus:" + tes.getExecDetailedStatus()); + Integer returnCode = tes.getJobExitCode(); + tes.info(tes.getTimeAsString() + "# Task " + label_<%=cid%> + ": returnCode:" + returnCode); + globalMap.put("<%=cid%>_RETURN_CODE", returnCode); + // check the last status again + { + de.cimt.talendcomp.tac.GetTaskStatus status = new de.cimt.talendcomp.tac.GetTaskStatus(conn_<%=cid%>); + status.setDebug(<%=debug%>); + status.setTaskId(taskId_<%=cid%>); + try { + status.execute(); + globalMap.put("<%=cid%>_STATUS", status.getStatus()); + globalMap.put("<%=cid%>_ERROR_STATUS", status.getErrorStatus()); + globalMap.put("<%=cid%>_HAS_ERRORS", status.hasErrors()); + globalMap.put("<%=cid%>_IS_READY_TO_RUN", status.isReadyToRun()); + globalMap.put("<%=cid%>_IS_PREPARING", status.isPreparing()); + globalMap.put("<%=cid%>_IS_RUNNING", status.isRunning()); + } catch (Exception e) { + String message = "Get task status for task:" + label_<%=cid%> + " failed:" + e.getMessage(); + globalMap.put("<%=cid%>_ERROR_MESSAGE", message); + throw e; + } + } + // by MM @ FLE 2017-10-17 + if (tes.getExecBasicStatus().equals("KILLED")) { + throw new Exception("Child job running killed by user."); + } +<% if (dieOnError) { %> + if (returnCode != null && returnCode.intValue() != 0) { + errorCode = returnCode; + globalMap.put("<%=cid%>_ERROR_MESSAGE", "Task " + label_<%=cid%> + ": Execute task failed. ErrorCode:" + errorCode); + throw new Exception("Child job running failed. ErrorCode:" + errorCode); + } +<% } %> +<% } else { // if (asynchron) %> + stopTime = System.currentTimeMillis(); + // simplyfied reliable request handling since Talend release 5.6 + int waitUntilFinished = <%=waitUntilFinished%>; + de.cimt.talendcomp.tac.GetTaskExecutionStatus tes = new de.cimt.talendcomp.tac.GetTaskExecutionStatus(conn_<%=cid%>); + tes.setDebug(<%=debug%>); + tes.setExecRequestId(runTask_<%=cid%>.getExecRequestId()); + tes.execute(); + tes.info(tes.getTimeAsString() + "# Task " + label_<%=cid%> + ": status:" + tes.getExecBasicStatus()); + tes.info(tes.getTimeAsString() + "# Task " + label_<%=cid%> + ": detailedStatus:" + tes.getExecDetailedStatus()); + Integer returnCode = tes.getJobExitCode(); + tes.info(tes.getTimeAsString() + "# Task " + label_<%=cid%> + ": returnCode:" + returnCode); + globalMap.put("<%=cid%>_RETURN_CODE", returnCode); + // check the last status again + { + de.cimt.talendcomp.tac.GetTaskStatus status = new de.cimt.talendcomp.tac.GetTaskStatus(conn_<%=cid%>); + status.setDebug(<%=debug%>); + status.setTaskId(taskId_<%=cid%>); + try { + status.execute(); + globalMap.put("<%=cid%>_STATUS", status.getStatus()); + globalMap.put("<%=cid%>_ERROR_STATUS", status.getErrorStatus()); + globalMap.put("<%=cid%>_HAS_ERRORS", status.hasErrors()); + globalMap.put("<%=cid%>_IS_READY_TO_RUN", status.isReadyToRun()); + globalMap.put("<%=cid%>_IS_PREPARING", status.isPreparing()); + globalMap.put("<%=cid%>_IS_RUNNING", status.isRunning()); + } catch (Exception e) { + String message = "Get task status for task:" + label_<%=cid%> + " failed:" + e.getMessage(); + globalMap.put("<%=cid%>_ERROR_MESSAGE", message); + throw e; + } + } + // by MM @ FLE 2017-10-17 + if (tes.getExecBasicStatus().equals("KILLED")) { + throw new Exception("Child job running killed by user."); + } +<% if (dieOnError) { %> + if (returnCode != null && returnCode.intValue() != 0) { + errorCode = returnCode; + globalMap.put("<%=cid%>_ERROR_MESSAGE", "Task " + label_<%=cid%> + ": Execute task failed. ErrorCode:" + errorCode); + throw new Exception("Child job running failed. ErrorCode:" + errorCode); + } +<% } %> +<% } // if (asynchron) %> + runTask_<%=cid%>.info("Task " + label_<%=cid%> + ": ready."); + if (stopTime > 0) { // set duration only if we have wait until the end + globalMap.put("<%=cid%>_RUN_DURATION", (stopTime - startTime)); + } + break; + } // startRun block end +<% } else { // mode unknown %> + // unkown mode detected + if (true) throw new IllegalStateException("Operational mode: <%=mode%> is unknown!"); +<% } %> + } finally { + conn_<%=cid%>.close(); + } diff --git a/talend_component/tRunTask/tRunTask_java.xml b/talend_component/tRunTask/tRunTask_java.xml index a27e9e2..4c371c9 100644 --- a/talend_component/tRunTask/tRunTask_java.xml +++ b/talend_component/tRunTask/tRunTask_java.xml @@ -1,106 +1,111 @@ - +
- -
- - System - Orchestration - - - http://jan-lolling.de/talend/components/help/tRunTask.pdf - - - - - - - - - - - - - - - - - - - "http://localhost:8080/org.talend.administrator" - - - "admin@company.com" - - - "admin" - - - false - - - false - - - "" - - - 1 - - - - - - - - - - false - - - true - - - 200 - - - 10000 - - - 1000 - - - false - - - - - - - - false - - - - - false - - - false - - - 1 - - - 5000 - + + + + System + Orchestration + + + http://jan-lolling.de/talend/components/help/tRunTask.pdf + + + + + + + + + + + + + + + + + + + "http://localhost:8080/org.talend.administrator" + + + "admin@company.com" + + + "admin" + + + false + + + false + + + "" + + + 1 + + + + + + + + + + false + + + true + + + 200 + + + 10000 + + + 1000 + + + false + + + + + + + + + + + + + false + + + + + false + + + false + + + 1 + + + 5000 + Release: 3.2 build at: 20171130 - - - + + + @@ -125,17 +130,17 @@ - - - - - - - - - - - - - + + + + + + + + + + + + +
diff --git a/talend_component/tRunTask/tRunTask_messages.properties b/talend_component/tRunTask/tRunTask_messages.properties index 50472a2..7e8db0f 100644 --- a/talend_component/tRunTask/tRunTask_messages.properties +++ b/talend_component/tRunTask/tRunTask_messages.properties @@ -16,8 +16,10 @@ WAIT_UNTIL_FINISHED_CHECK_CYCLE.NAME=Check time cycle until job has been finishe DIE_ON_ERROR.NAME=Die on error (communication errors to the TAC will always let the job die) RETURN_CODE.NAME=Return code from task job CHECK_TASK_NOT_RUNNING_SIMULTANEOUSLY.NAME=Wait until this task and none of the task in the next list are running -TASK_NOT_RUNNING_SIMULTANEOUSLY.NAME=List of task -TASK_NOT_RUNNING_SIMULTANEOUSLY.ITEM.TASK_NAME=Task name +TASK_NOT_RUNNING_SIMULTANEOUSLY.NAME=List of task labels +TASK_NOT_RUNNING_SIMULTANEOUSLY.ITEM.TASK_NAME=Task name (Label of the task in the TAC, this is NOT the job name!) +TASK_NOT_RUNNING_SIMULTANEOUSLY_IDS.NAME=List of task IDs +TASK_NOT_RUNNING_SIMULTANEOUSLY_IDS.ITEM.TASK_ID=Task ID WAIT_UNTIL_RUNNING_TIMEOUT.NAME=Timeout for check until running WAIT_UNTIL_END.NAME=Wait until task ends DEBUG.NAME=Debug output requests and response