Getting User Input After Submitting the JobΒΆ
Jobs in Speedwagon are a limited to user interaction once a job has been submitted. As of version 0.4.0, there is no way to inquire feedback from a user once the tasks have started to run.
However, there is a single time in the lifespan of a job that a user can be asked for additional information after the job has been submitted.
If the Workflow
derived class implements
the get_additional_info()
method, the user can be asked to
provide additional information to configure the job already started but
before discover_task_metadata()
and starting the
main tasks of the job.
It is intended to be used with the initial_task()
method of the derived
Workflow
because
Results
from these tasks
are available to discover_task_metadata()
class LocateBadFiles(speedwagon.tasks.Subtask):
def work(self) -> bool:
self.set_results(["file1.txt", "file2.txt"])
return True
class WorkflowRemoveBadFiles(speedwagon.Workflow):
name = "Remove Bad Files"
def initial_task(
self,
task_builder: TaskBuilder, **user_args
) -> None:
# Contrived task that locates "bad files"
task_builder.add_subtask(LocateBadFiles())
def get_additional_info(
self,
user_request_factory: UserRequestFactory,
options: dict, pretask_results: list
) -> dict:
# From the list of possible bad files, ask the user
# to approve which files are okay to delete.
requester = user_request_factory.confirm_removal()
return requester.get_user_response(
options,
pretask_results
)
def discover_task_metadata(
self,
initial_results: List[speedwagon.tasks.Result[List[str]]],
additional_data: Dict[str, Any],
**user_args
) -> List[dict]:
# Use the initial_results collected from
# get_additional_info to only create task information for
# the files approved by the user.
files_to_remove = []
for initial_result in initial_results:
if initial_result.source == LocateBadFiles:
files_to_remove.extend(
{"file_name": file_to_delete}
for file_to_delete in initial_result.data
)
return files_to_remove
def create_new_task(
self,
task_builder: TaskBuilder,
**job_args
) -> None:
task_builder.add_subtask(
speedwagon.tasks.filesystem.DeleteFile(
path=job_args["file_name"]
)
)