You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
iceraven-browser/taskcluster/fenix_taskgraph/target_tasks.py

120 lines
4.0 KiB
Python

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import os
from redo import retry
from taskgraph.target_tasks import _target_task
from taskgraph.util.taskcluster import find_task_id
def index_exists(index_path, reason=""):
print(f"Looking for existing index {index_path} {reason}...")
try:
task_id = find_task_id(index_path)
print(f"Index {index_path} exists: taskId {task_id}")
return True
except KeyError:
print(f"Index {index_path} doesn't exist.")
return False
@_target_task("promote")
def target_tasks_promote(full_task_graph, parameters, graph_config):
def filter(task, parameters):
if (
task.attributes.get("release-type") == parameters["release_type"]
and task.attributes.get("shipping_phase") == "promote"
):
return True
return [l for l, t in full_task_graph.tasks.items() if filter(t, parameters)]
@_target_task("ship")
def target_tasks_ship(full_task_graph, parameters, graph_config):
filtered_for_candidates = target_tasks_promote(
full_task_graph,
parameters,
graph_config,
)
def filter(task, parameters):
# Include promotion tasks; these will be optimized out
if task.label in filtered_for_candidates:
return True
if (
task.attributes.get("release-type") == parameters["release_type"]
and task.attributes.get("shipping_phase") == "ship"
):
return True
return [l for l, t in full_task_graph.tasks.items() if filter(t, parameters)]
@_target_task("nightly")
def target_tasks_nightly(full_task_graph, parameters, graph_config):
"""Select the set of tasks required for a nightly build."""
def filter(task, parameters):
return task.attributes.get("nightly", False)
index_path = (
f"{graph_config['trust-domain']}.v2.{parameters['project']}.branch."
f"{parameters['head_ref']}.revision.{parameters['head_rev']}.taskgraph.decision-nightly"
)
if os.environ.get("MOZ_AUTOMATION") and retry(
index_exists,
args=(index_path,),
kwargs={
"reason": "to avoid triggering multiple nightlies off the same revision",
},
):
return []
return [l for l, t in full_task_graph.tasks.items() if filter(t, parameters)]
@_target_task("nightly-test")
def target_tasks_nightly_test(full_task_graph, parameters, graph_config):
"""Select the set of tasks required for a nightly build."""
def filter(task, parameters):
return task.attributes.get("nightly-test", False)
return [l for l, t in full_task_graph.tasks.items() if filter(t, parameters)]
def _filter_fennec(fennec_type, task, parameters):
return task.attributes.get("build-type", "") == "fennec-{}".format(fennec_type)
@_target_task("fennec-production")
def target_tasks_fennec_nightly(full_task_graph, parameters, graph_config):
"""Select the set of tasks required for a production build signed with the fennec key."""
return [
l
for l, t in full_task_graph.tasks.items()
if _filter_fennec("production", t, parameters)
]
@_target_task("screenshots")
def target_tasks_screnshots(full_task_graph, parameters, graph_config):
"""Select the set of tasks required to generate screenshots on a real device."""
def filter(task, parameters):
return task.attributes.get("screenshots", False)
return [l for l, t in full_task_graph.tasks.items() if filter(t, parameters)]
@_target_task("legacy_api_ui_tests")
def target_tasks_legacy_api_ui_tests(full_task_graph, parameters, graph_config):
"""Select the set of tasks required to run select UI tests on other API."""
def filter(task, parameters):
return task.attributes.get("legacy", False)
return [l for l, t in full_task_graph.tasks.items() if filter(t, parameters)]