2222import org .slf4j .LoggerFactory ;
2323
2424import java .util .*;
25+ import java .util .concurrent .BlockingQueue ;
2526import java .util .concurrent .ConcurrentHashMap ;
2627import java .util .concurrent .ConcurrentMap ;
28+ import java .util .concurrent .LinkedBlockingQueue ;
2729import java .util .concurrent .atomic .AtomicInteger ;
2830
2931class AssignableVMs {
@@ -54,12 +56,21 @@ private void reset() {
5456 }
5557 }
5658
59+ private static class HostDisablePair {
60+ private final String host ;
61+ private final Long until ;
62+
63+ HostDisablePair (String host , Long until ) {
64+ this .host = host ;
65+ this .until = until ;
66+ }
67+ }
68+
5769 private final VMCollection vmCollection ;
58- private final Action1 <VirtualMachineLease > leaseRejectAction ;
59- private final long leaseOfferExpirySecs ;
6070 private static final Logger logger = LoggerFactory .getLogger (AssignableVMs .class );
6171 private final ConcurrentMap <String , String > leaseIdToHostnameMap = new ConcurrentHashMap <>();
6272 private final ConcurrentMap <String , String > vmIdToHostnameMap = new ConcurrentHashMap <>();
73+ private final BlockingQueue <HostDisablePair > disableRequests = new LinkedBlockingQueue <>();
6374 private final TaskTracker taskTracker ;
6475 private final String attrNameToGroupMaxResources ;
6576 private final Map <String , Map <VMResource , Double >> maxResourcesMap ;
@@ -74,7 +85,6 @@ void assignResult(TaskAssignmentResult result) {
7485 private final ActiveVmGroups activeVmGroups ;
7586 private String activeVmGroupAttributeName =null ;
7687 private final List <String > unknownLeaseIdsToExpire = new ArrayList <>();
77- private final boolean singleLeaseMode ;
7888
7989 AssignableVMs (TaskTracker taskTracker , Action1 <VirtualMachineLease > leaseRejectAction ,
8090 long leaseOfferExpirySecs , int maxOffersToReject ,
@@ -85,14 +95,11 @@ void assignResult(TaskAssignmentResult result) {
8595 leaseRejectAction , leaseOfferExpirySecs , taskTracker , singleLeaseMode ),
8696 autoScaleByAttributeName
8797 );
88- this .leaseRejectAction = leaseRejectAction ;
89- this .leaseOfferExpirySecs = leaseOfferExpirySecs ;
9098 this .attrNameToGroupMaxResources = attrNameToGroupMaxResources ;
9199 maxResourcesMap = new HashMap <>();
92100 totalResourcesMap = new HashMap <>();
93101 vmRejectLimiter = new VMRejectLimiter (maxOffersToReject , leaseOfferExpirySecs ); // ToDo make this configurable?
94102 activeVmGroups = new ActiveVmGroups ();
95- this .singleLeaseMode = singleLeaseMode ;
96103 }
97104
98105 VMCollection getVmCollection () {
@@ -170,9 +177,19 @@ void expireAllLeases() {
170177 }
171178
172179 void disableUntil (String host , long until ) {
173- final Optional <AssignableVirtualMachine > vmByName = vmCollection .getVmByName (host );
174- if (vmByName .isPresent ())
175- vmByName .get ().setDisabledUntil (until );
180+ disableRequests .offer (new HostDisablePair (host , until ));
181+ }
182+
183+ private void disableVMs () {
184+ if (disableRequests .peek () == null )
185+ return ;
186+ List <HostDisablePair > disablePairs = new LinkedList <>();
187+ disableRequests .drainTo (disablePairs );
188+ for (HostDisablePair hostDisablePair : disablePairs ) {
189+ final Optional <AssignableVirtualMachine > vmByName = vmCollection .getVmByName (hostDisablePair .host );
190+ if (vmByName .isPresent ())
191+ vmByName .get ().setDisabledUntil (hostDisablePair .until );
192+ }
176193 }
177194
178195 void enableVM (String host ) {
@@ -212,6 +229,7 @@ private void expireAnyUnknownLeaseIds() {
212229 }
213230
214231 List <AssignableVirtualMachine > prepareAndGetOrderedVMs (List <VirtualMachineLease > newLeases , AtomicInteger rejectedCount ) {
232+ disableVMs ();
215233 expireAnyUnknownLeaseIds ();
216234 removeExpiredLeases ();
217235 rejectedCount .addAndGet (addLeases (newLeases ));
0 commit comments