001    /*
002     *                    BioJava development code
003     *
004     * This code may be freely distributed and modified under the
005     * terms of the GNU Lesser General Public Licence.  This should
006     * be distributed with the code.  If you do not have a copy,
007     * see:
008     *
009     *      http://www.gnu.org/copyleft/lesser.html
010     *
011     * Copyright for this code is held jointly by the individual
012     * authors.  These should be listed in @author doc comments.
013     *
014     * For more information on the BioJava project and its aims,
015     * or to join the biojava-l mailing list, visit the home page
016     * at:
017     *
018     *      http://www.biojava.org/
019     *
020     * Created on May 26, 2010
021     * Author: Mark Chapman
022     */
023    
024    package org.biojava3.core.util;
025    
026    import java.util.concurrent.Callable;
027    import java.util.concurrent.Future;
028    import java.util.concurrent.LinkedBlockingQueue;
029    import java.util.concurrent.ThreadPoolExecutor;
030    import java.util.concurrent.TimeUnit;
031    
032    /**
033     * Static utility to easily share a thread pool for concurrent/parallel/lazy execution.  To exit cleanly,
034     * {@link #shutdown()} or {@link #shutdownAndAwaitTermination()} must be called after all tasks have been submitted.
035     *
036     * @author Mark Chapman
037     */
038    public class ConcurrencyTools {
039    
040        private static ThreadPoolExecutor pool;
041        // private static int tasks;
042    
043        // TODO additional logging and listening services
044    
045        // prevents instantiation
046        private ConcurrencyTools() { }
047    
048        /**
049         * Returns current shared thread pool.  Starts up a new pool, if necessary.
050         *
051         * @return shared thread pool
052         */
053        public static ThreadPoolExecutor getThreadPool() {
054            if (pool == null || pool.isShutdown()) {
055                setThreadPoolDefault();
056            }
057            return pool;
058        }
059    
060        /**
061         * Sets thread pool to reserve a given number of processor cores for foreground or other use.
062         *
063         * @param cpus number of processor cores to reserve
064         */
065        public static void setThreadPoolCPUsAvailable(int cpus) {
066            setThreadPoolSize(Math.max(1, Runtime.getRuntime().availableProcessors() - cpus));
067        }
068    
069        /**
070         * Sets thread pool to a given fraction of the available processors.
071         *
072         * @param fraction portion of available processors to use in thread pool
073         */
074        public static void setThreadPoolCPUsFraction(float fraction) {
075            setThreadPoolSize(Math.max(1, Math.round(fraction * Runtime.getRuntime().availableProcessors())));
076        }
077    
078        /**
079         * Sets thread pool to default of 1 background thread for each processor core.
080         */
081        public static void setThreadPoolDefault() {
082            setThreadPoolCPUsAvailable(0);
083        }
084    
085        /**
086         * Sets thread pool to a single background thread.
087         */
088        public static void setThreadPoolSingle() {
089            setThreadPoolSize(1);
090        }
091    
092        /**
093         * Sets thread pool to given size.
094         *
095         * @param threads number of threads in pool
096         */
097        public static void setThreadPoolSize(int threads) {
098            setThreadPool(   new ThreadPoolExecutor(threads, threads,
099                                          0L, TimeUnit.MILLISECONDS,
100                                          new LinkedBlockingQueue<Runnable>()));
101            
102            
103        }
104    
105        /**
106         * Sets thread pool to any given {@link ThreadPoolExecutor} to allow use of an alternative execution style.
107         *
108         * @param pool thread pool to share
109         */
110        public static void setThreadPool(ThreadPoolExecutor pool) {
111            if (ConcurrencyTools.pool != pool) {
112                shutdown();
113                ConcurrencyTools.pool = pool;
114            }
115        }
116    
117        /**
118         * Disables new tasks from being submitted and closes the thread pool cleanly.
119         */
120        public static void shutdown() {
121            if (pool != null) {
122                pool.shutdown();
123            }
124        }
125    
126        /**
127         * Closes the thread pool.  Waits 1 minute for a clean exit; if necessary, waits another minute for cancellation.
128         */
129        public static void shutdownAndAwaitTermination() {
130            shutdown();
131            if (pool != null) {
132                try {
133                    // wait a while for existing tasks to terminate
134                    if (!pool.awaitTermination(60L, TimeUnit.SECONDS)) {
135                        pool.shutdownNow(); // cancel currently executing tasks
136                        // wait a while for tasks to respond to being canceled
137                        if (!pool.awaitTermination(60L, TimeUnit.SECONDS)) {
138                            System.err.println("BioJava ConcurrencyTools thread pool did not terminate");
139                        }
140                    }
141                } catch (InterruptedException ie) {
142                    pool.shutdownNow(); // (re-)cancel if current thread also interrupted
143                    Thread.currentThread().interrupt(); // preserve interrupt status
144                }
145            }
146        }
147    
148        /**
149         * Queues up a task and adds a log entry.
150         *
151         * @param <T> type returned from the submitted task
152         * @param task submitted task
153         * @param message logged message
154         * @return future on which the desired value is retrieved by calling get()
155         */
156        public static<T> Future<T> submit(Callable<T> task, String message) {
157            // TODO log("Task " + (++tasks) + " submitted to shared thread pool. " + message);
158            return getThreadPool().submit(task);
159        }
160    
161        /**
162         * Queues up a task and adds a default log entry.
163         *
164         * @param <T> type returned from the submitted task
165         * @param task submitted task
166         * @return future on which the desired value is retrieved by calling get()
167         */
168        public static<T> Future<T> submit(Callable<T> task) {
169            return submit(task, "");
170        }
171    
172    }