-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathFSParentQueue.py
More file actions
107 lines (77 loc) · 3.24 KB
/
FSParentQueue.py
File metadata and controls
107 lines (77 loc) · 3.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
'''
Created on Jan 8, 2015
@author: niuzhaojie
'''
from FSQueue import FSQueue
from Resources import Resources
from policies.PolicyParser import PolicyParser
import math
class FSParentQueue(FSQueue):
'''
classdocs
'''
def __init__(self, name, parent, scheduler):
'''
Constructor
'''
super(FSParentQueue, self).__init__(name, parent, scheduler)
self._childQueues = []
self._demand = Resources.createResource(0, 0, 0, 0)
def addChildQueue(self, child):
self._childQueues.append(child)
def recomputeShares(self):
self._policy.computeShares(self._childQueues, self.getFairShare())
for childQueue in self._childQueues:
childQueue.recomputeShares()
def getDemand(self):
return self._demand
def getResourceUsage(self):
usage = Resources.createResource(0, 0, 0, 0)
for child in self._childQueues:
Resources.addTo(usage, child.getResourceUsage())
return usage
def updateDemand(self):
self._demand = Resources.createResource(0, 0, 0, 0)
for childQueue in self._childQueues:
childQueue.updateDemand()
toAdd = childQueue.getDemand()
Resources.addTo(self._demand, toAdd)
def assignContainer(self, node):
assigned = Resources.none()
if node.getReservedContainer() != None:
return assigned
# performance and fairness tradeoff
selectivity = 1 - self._scheduler.getTradeoff()
# first, sort by current policy
self._childQueues.sort(self._policy.getComparator())
# second, filtering according to selectivity
end = int(min(len(self._childQueues), max(1, math.ceil(len(self._childQueues) * selectivity))))
selectedChildQueues = self._childQueues[0 : end]
# third, order the selected list according fitness
multiResFitnessComparator = PolicyParser.getInstance("MRF", self._scheduler.getClusterCapacity()).getComparator()
selectedChildQueues.sort(multiResFitnessComparator)
for child in selectedChildQueues:
assigned = child.assignContainer(node)
if not Resources.equals(assigned, Resources.none()):
break
if Resources.equals(assigned, Resources.none()):
for child in self._childQueues[end:]:
assigned = child.assignContainer(node)
if not Resources.equals(assigned, Resources.none()):
break
# default implementation
'''
self._childQueues.sort(self._policy.getComparator())
for child in self._childQueues:
assigned = child.assignContainer(node)
if not Resources.equals(assigned, Resources.none()):
break'''
return assigned
def getChildQueues(self):
return self._childQueues
def getAllAppSchedulables(self):
apps = []
for child in self._childQueues:
childApps = child.getAllAppSchedulables()
apps.extend(childApps)
return apps