View Javadoc
1   //******************************************************************************
2   //
3   // File:    ParallelTeamThread.java
4   // Package: edu.rit.pj
5   // Unit:    Class edu.rit.pj.ParallelTeamThread
6   //
7   // This Java source file is copyright (C) 2007 by Alan Kaminsky. All rights
8   // reserved. For further information, contact the author, Alan Kaminsky, at
9   // ark@cs.rit.edu.
10  //
11  // This Java source file is part of the Parallel Java Library ("PJ"). PJ is free
12  // software; you can redistribute it and/or modify it under the terms of the GNU
13  // General Public License as published by the Free Software Foundation; either
14  // version 3 of the License, or (at your option) any later version.
15  //
16  // PJ is distributed in the hope that it will be useful, but WITHOUT ANY
17  // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
18  // A PARTICULAR PURPOSE. See the GNU General Public License for more details.
19  //
20  // Linking this library statically or dynamically with other modules is making a
21  // combined work based on this library. Thus, the terms and conditions of the GNU
22  // General Public License cover the whole combination.
23  //
24  // As a special exception, the copyright holders of this library give you
25  // permission to link this library with independent modules to produce an
26  // executable, regardless of the license terms of these independent modules, and
27  // to copy and distribute the resulting executable under terms of your choice,
28  // provided that you also meet, for each linked independent module, the terms
29  // and conditions of the license of that module. An independent module is a module
30  // which is not derived from or based on this library. If you modify this library,
31  // you may extend this exception to your version of the library, but you are not
32  // obligated to do so. If you do not wish to do so, delete this exception
33  // statement from your version.
34  //
35  // A copy of the GNU General Public License is provided in the file gpl.txt. You
36  // may also obtain a copy of the GNU General Public License on the World Wide
37  // Web at http://www.gnu.org/licenses/gpl.html.
38  //
39  //******************************************************************************
40  
41  //******************************************************************************
42  // File modified 10/8/2014 by Jacob Litman to enable garbage collection of
43  // ParallelTeamThreads.
44  //******************************************************************************
45  package edu.rit.pj;
46  
47  import edu.rit.pj.reduction.SharedLong;
48  import java.util.concurrent.Semaphore;
49  
50  /**
51   * Class ParallelTeamThread provides one thread in a {@linkplain ParallelTeam}
52   * of threads for executing a {@linkplain ParallelRegion} in parallel.
53   *
54   * @author Alan Kaminsky
55   * @version 20-Dec-2007
56   */
57  class ParallelTeamThread
58          implements Runnable {
59  
60      // Hidden data members.
61      // Reference to the parallel team.
62      ParallelTeam myTeam;
63  
64      // Index of this thread within the parallel team.
65      int myIndex;
66  
67      // Semaphore for synchronizing threads at the beginning of a parallel
68      // region.
69      Semaphore myRegionBeginSemaphore = new Semaphore(0);
70  
71      // Thread barrier flag. Used by the ParallelRegion.barrier() method.
72      volatile int myBarrierFlag;
73  
74      // Parallel construct counter. Counts how many times this thread has arrived
75      // at the top of a parallel construct.
76      volatile int myConstructCount;
77  
78      // IntegerSchedule object for a parallel for loop.
79      volatile IntegerSchedule myIntegerSchedule;
80  
81      // LongSchedule object for a parallel for loop.
82      volatile LongSchedule myLongSchedule;
83  
84      // Item generator object for a parallel iteration.
85      volatile ItemGenerator<?> myItemGenerator;
86  
87      // Exception thrown while setting up a parallel construct, or null if none.
88      volatile Throwable myConstructException;
89  
90      // The thread instance
91      private final Thread thread;
92  
93      /**
94       * Number of threads created by the ParallelTeamThread constructor.
95       */
96      public static SharedLong totalThreads = new SharedLong(0);
97  
98      // 128 bytes of extra padding to avert cache interference.
99      private long p0, p1, p2, p3, p4, p5, p6, p7;
100     private long p8, p9, pa, pb, pc, pd, pe, pf;
101 
102 // Exported constructors.
103 
104     /**
105      * Construct a new parallel team thread.
106      *
107      * @param theTeam  Parallel team to which this thread belongs.
108      * @param theIndex Index of this thread within the team.
109      */
110     public ParallelTeamThread(ParallelTeam theTeam,
111                               int theIndex) {
112         myTeam = theTeam;
113         myIndex = theIndex;
114 
115         String name = "ParallelTeamThread-" + theIndex;
116 
117         boolean isVirtual = PJProperties.getPjVt();
118         if (isVirtual) {
119             // Create a virtual thread (all virtual threads are daemon threads).
120             thread = Thread.ofVirtual().name(name).unstarted(this);
121         } else {
122             // Create a new thread for this parallel team thread.
123             thread = Thread.ofPlatform().name(name).unstarted(this);
124             thread.setDaemon(true);
125         }
126 
127         long numThreads = totalThreads.incrementAndGet();
128         // System.out.printf(" Creating team %s thread %s out of %d.\n", myTeam, name, numThreads);
129 
130         thread.start();
131     }
132 
133 // Exported operations.
134 
135     /**
136      * Get the underlying thread.
137      *
138      * @return The Thread instance
139      */
140     public Thread getThread() {
141         return thread;
142     }
143 
144     /**
145      * Run this parallel team thread.
146      */
147     @Override
148     public void run() {
149         for (;;) {
150             // Wait until released by the main thread.
151             myRegionBeginSemaphore.acquireUninterruptibly();
152 
153             if (myTeam.myRegion instanceof KillRegion) {
154                 try {
155                     myTeam.myRegion.run();
156                 } catch (Exception ex) {
157                     System.err.printf("Error exiting parallel team thread: %s%n", ex);
158                 }
159 
160                 // Exit the for loop and run method, which will allow threads to be garbage collected.
161                 break;
162             }
163             
164             // Call the parallel region's run() method. Save any
165             // exception for later.
166             try {
167                 myTeam.myRegion.run();
168             } catch (Throwable exc) {
169                 synchronized (System.err) {
170                     System.err.println("Parallel team thread " + myIndex
171                             + ": ParallelRegion.run() threw an exception");
172                     exc.printStackTrace(System.err);
173                     if (exc.getCause() != null) {
174                         exc.getCause().printStackTrace(System.err);
175                     }
176                 }
177                 myTeam.myExceptionMap.put(myIndex, exc);
178             }
179 
180             // Tell the main thread we're done.
181             myTeam.myRegionEndSemaphore.release();
182         }
183         myTeam.myRegionEndSemaphore.release();
184     }
185 
186 // Hidden operations.
187     /**
188      * Do this thread's portion of a barrier with no barrier action. This method
189      * is called by thread 1 through thread K-1 of the parallel team.
190      */
191     void barrier() {
192         // Get the new team barrier flag.
193         int newBarrierFlag = myTeam.myBarrierFlag ^ 1;
194 
195         // Switch this thread to the new barrier flag.
196         myBarrierFlag = newBarrierFlag;
197 
198         // Wait until thread 0 has switched to the new team barrier flag.
199         if (myTeam.myBarrierFlag != newBarrierFlag) {
200             Spinner spinner = new Spinner();
201             while (myTeam.myBarrierFlag != newBarrierFlag) {
202                 spinner.spin();
203             }
204         }
205     }
206 
207     /**
208      * Do this thread's portion of a barrier with a barrier action. This method
209      * is called by thread 1 through thread K-1 of the parallel team.
210      *
211      * @param action Barrier action.
212      *
213      * @exception Exception Thrown if the <code>action</code>'s <code>run()</code>
214      * method throws an exception.
215      */
216     void barrier(BarrierAction action)
217             throws Exception {
218         barrier();
219     }
220 
221     /**
222      * Do processing when this thread arrives at the top of a parallel
223      * construct.
224      *
225      * @return True if this thread is the first to arrive, false otherwise.
226      */
227     boolean arriveAtParallelConstruct() {
228         // Wait until every thread's construct count is greater than or equal to
229         // this thread's construct count.
230         int K = myTeam.K;
231         for (int i = 0; i < K; ++i) {
232             ParallelTeamThread thread_i = myTeam.myThread[i];
233             if (thread_i.myConstructCount < this.myConstructCount) {
234                 Spinner spinner = new Spinner();
235                 while (thread_i.myConstructCount < this.myConstructCount) {
236                     spinner.spin();
237                 }
238             }
239         }
240 
241         // Determine if this thread is the first to arrive.
242         for (;;) {
243             int oldConstructCount = myTeam.myConstructCount.get();
244             if (oldConstructCount != myConstructCount) {
245                 // This thread is not the first to arrive.
246                 myConstructCount = oldConstructCount;
247                 return false;
248             }
249             // This thread may be the first to arrive.
250             if (myTeam.myConstructCount.compareAndSet(oldConstructCount, oldConstructCount + 1)) {
251                 // This thread is the first to arrive.
252                 myConstructCount = oldConstructCount + 1;
253                 return true;
254             }
255         }
256     }
257 
258     /**
259      * Set this thread's IntegerSchedule.
260      *
261      * @param theSchedule Integer schedule.
262      */
263     void setIntegerSchedule(IntegerSchedule theSchedule) {
264         // Wait until myIntegerSchedule is null.
265         if (myIntegerSchedule != null) {
266             Spinner spinner = new Spinner();
267             while (myIntegerSchedule != null) {
268                 spinner.spin();
269             }
270         }
271 
272         myIntegerSchedule = theSchedule;
273     }
274 
275     /**
276      * Get this thread's IntegerSchedule.
277      *
278      * @return IntegerSchedule.
279      *
280      * @exception Exception This method can throw any exception.
281      */
282     IntegerSchedule getIntegerSchedule()
283             throws Exception {
284         // Wait until myIntegerSchedule or myConstructException is set to a
285         // non-null value.
286         if (myIntegerSchedule == null && myConstructException == null) {
287             Spinner spinner = new Spinner();
288             while (myIntegerSchedule == null && myConstructException == null) {
289                 spinner.spin();
290             }
291         }
292 
293         // Make temporary copies and null out originals.
294         IntegerSchedule schedule = myIntegerSchedule;
295         myIntegerSchedule = null;
296         Throwable exc = myConstructException;
297         myConstructException = null;
298 
299         // Re-throw exception if necessary.
300         ParallelTeam.rethrow(exc);
301 
302         return schedule;
303     }
304 
305     /**
306      * Set this thread's LongSchedule.
307      *
308      * @param theSchedule Long schedule.
309      */
310     void setLongSchedule(LongSchedule theSchedule) {
311         // Wait until myLongSchedule is null.
312         if (myLongSchedule != null) {
313             Spinner spinner = new Spinner();
314             while (myLongSchedule != null) {
315                 spinner.spin();
316             }
317         }
318 
319         myLongSchedule = theSchedule;
320     }
321 
322     /**
323      * Get this thread's LongSchedule.
324      *
325      * @return LongSchedule.
326      *
327      * @exception Exception This method can throw any exception.
328      */
329     LongSchedule getLongSchedule()
330             throws Exception {
331         // Wait until myLongSchedule or myConstructException is set to a
332         // non-null value.
333         if (myLongSchedule == null && myConstructException == null) {
334             Spinner spinner = new Spinner();
335             while (myLongSchedule == null && myConstructException == null) {
336                 spinner.spin();
337             }
338         }
339 
340         // Make temporary copies and null out originals.
341         LongSchedule schedule = myLongSchedule;
342         myLongSchedule = null;
343         Throwable exc = myConstructException;
344         myConstructException = null;
345 
346         // Re-throw exception if necessary.
347         ParallelTeam.rethrow(exc);
348 
349         return schedule;
350     }
351 
352     /**
353      * Set this thread's ItemGenerator.
354      *
355      * @param theItemGenerator Item generator.
356      */
357     void setItemGenerator(ItemGenerator<?> theItemGenerator) {
358         // Wait until myItemGenerator is null.
359         if (myItemGenerator != null) {
360             Spinner spinner = new Spinner();
361             while (myItemGenerator != null) {
362                 spinner.spin();
363             }
364         }
365 
366         myItemGenerator = theItemGenerator;
367     }
368 
369     /**
370      * Get this thread's ItemGenerator.
371      *
372      * @return Item generator.
373      *
374      * @exception Exception This method can throw any exception.
375      */
376     ItemGenerator<?> getItemGenerator()
377             throws Exception {
378         // Wait until myItemGenerator or myConstructException is set to a
379         // non-null value.
380         if (myItemGenerator == null && myConstructException == null) {
381             Spinner spinner = new Spinner();
382             while (myItemGenerator == null && myConstructException == null) {
383                 spinner.spin();
384             }
385         }
386 
387         // Make temporary copies and null out originals.
388         ItemGenerator<?> generator = myItemGenerator;
389         myItemGenerator = null;
390         Throwable exc = myConstructException;
391         myConstructException = null;
392 
393         // Re-throw exception if necessary.
394         ParallelTeam.rethrow(exc);
395 
396         return generator;
397     }
398 
399     /**
400      * Set this thread's construct exception.
401      *
402      * @param theException Construct exception.
403      */
404     void setConstructException(Throwable theException) {
405         // Wait until myConstructException is null.
406         if (myConstructException != null) {
407             Spinner spinner = new Spinner();
408             while (myConstructException != null) {
409                 spinner.spin();
410             }
411         }
412 
413         myConstructException = theException;
414     }
415 
416 }