Jump to content

Thread Pool Manager (Dynamically on system's CPU load)


Recommended Posts

Hi,

 

This is a custom implementation of a thread pool management system, designed to efficiently manage the execution of tasks in a multi-threaded environment. Automatically adjusts the size of the thread pools based on the system's CPU load and the number of available cores. This dynamic adjustment ensures that the thread pools are always optimized for the current system load, resulting in efficient resource utilization and improved performance. If the calculated sizes are equal to the current core pool sizes, this means that there's no need to update the pool sizes, as they are already optimized for the current system load. I haven't conducted extensive testing yet; however, the code can be thoroughly tested, and if any issues are encountered, I can make the necessary modifications. Comments were added throughout the code.

 

For testing purposes you can do the following:

 

		scheduleAtFixedRate(() ->
		{
			try
			{
				purge();
				updateThreadPoolSizes();
			}
			catch (Exception e)
			{
				LOGGER.severe("Error during scheduled ThreadPool update: " + e.getMessage());
			}
		}, 10, 10, TimeUnit.SECONDS);

 

	private static void updateThreadPoolSizes()
	{
		double systemLoad = getSystemCpuLoad();
		int availableCores = Runtime.getRuntime().availableProcessors();
		int newScheduledThreadPoolSize = calculateThreadPoolSize(availableCores, systemLoad, true);
		int newInstantThreadPoolSize = calculateThreadPoolSize(availableCores, systemLoad, false);
		
		if (newScheduledThreadPoolSize != scheduledExecutor.getCorePoolSize())
		{
			scheduledExecutor.setCorePoolSize(newScheduledThreadPoolSize);
			LOGGER.info("Updated scheduled thread pool size to " + newScheduledThreadPoolSize + ", CPU: " + getSystemCpuLoadPercentage());
		}
		else
		{
			LOGGER.info("Scheduled thread pool size remains to " + newScheduledThreadPoolSize + ", CPU: " + getSystemCpuLoadPercentage());
		}
		if (newInstantThreadPoolSize != instantExecutor.getCorePoolSize())
		{
			instantExecutor.setCorePoolSize(newInstantThreadPoolSize);
			LOGGER.info("Updated instant thread pool size to " + newInstantThreadPoolSize + ", CPU: " + getSystemCpuLoadPercentage());
		}
		else
		{
			LOGGER.info("Instant thread pool size remains to " + newInstantThreadPoolSize + ", CPU: " + getSystemCpuLoadPercentage());
		}
	}

 

P.S.: This is the lowest it can get:
 

return Math.max(isScheduledPool ? 16 : 8, threadPoolSize);

 

The initial code:
 

 * This file is part of the L2Gold Classic project.
 * 
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 * 
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
 * General Public License for more details.
 * 
 * You should have received a copy of the GNU General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
package gold.lineage2.commons.threads;

import java.lang.management.ManagementFactory;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

import com.sun.management.OperatingSystemMXBean;

/**
 * A custom implementation of a thread pool management system, designed to efficiently manage the execution of tasks in a multi-threaded environment.
 * Automatically adjusts the size of the thread pools based on the system's CPU load and the number of available cores. This dynamic adjustment ensures
 * that the thread pools are always optimized for the current system load, resulting in efficient resource utilization and improved performance.
 * @author Trance
 */
public class ThreadPool
{
	// Logger to log information and errors.
	private static final Logger LOGGER = Logger.getLogger(ThreadPool.class.getName());
	// Maximum delay to be used to validate the delay.
	private static final long MAX_DELAY = TimeUnit.NANOSECONDS.toMillis(Long.MAX_VALUE - System.nanoTime()) / 2;
	
	// ScheduledThreadPoolExecutor for scheduled tasks.
	private static ScheduledThreadPoolExecutor scheduledExecutor;
	// ThreadPoolExecutor for instant tasks.
	private static ThreadPoolExecutor instantExecutor;
	
	/**
	 * Initialize the ThreadPool with the appropriate sizes.
	 */
	public static void init()
	{
		LOGGER.info("ThreadPool: Initialized");
		
		int availableCores = Runtime.getRuntime().availableProcessors();
		int scheduledThreadPoolSize = availableCores * 4;
		int instantThreadPoolSize = availableCores * 2;
		
		scheduledExecutor = new ScheduledThreadPoolExecutor(scheduledThreadPoolSize, new PriorityThreadFactory("ScheduledThreadPool", Thread.NORM_PRIORITY), new ThreadPoolExecutor.CallerRunsPolicy());
		scheduledExecutor.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl());
		scheduledExecutor.prestartAllCoreThreads();
		LOGGER.info("Scheduled thread pool size: " + scheduledThreadPoolSize);
		
		instantExecutor = new ThreadPoolExecutor(instantThreadPoolSize, Integer.MAX_VALUE, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new PriorityThreadFactory("ThreadPoolExecutor", Thread.NORM_PRIORITY), new ThreadPoolExecutor.CallerRunsPolicy());
		instantExecutor.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl());
		instantExecutor.prestartAllCoreThreads();
		LOGGER.info("Instant thread pool size: " + instantThreadPoolSize);
		
		scheduleAtFixedRate(() ->
		{
			try
			{
				purge();
				updateThreadPoolSizes();
			}
			catch (Exception e)
			{
				LOGGER.severe("Error during scheduled ThreadPool update: " + e.getMessage());
			}
		}, 5, 5, TimeUnit.MINUTES);
	}
	
	/**
	 * Purge both thread pools.
	 */
	public static void purge()
	{
		scheduledExecutor.purge();
		instantExecutor.purge();
	}
	
	/**
	 * Validate the delay ensuring it's within acceptable bounds.
	 * @param delay The delay value to be validated.
	 * @param timeUnit The time unit of the delay.
	 * @return A validated delay value.
	 */
	private static long validate(long delay, TimeUnit timeUnit)
	{
		long delayInMilliseconds = timeUnit.toMillis(delay);
		long validatedDelay = Math.max(0, Math.min(MAX_DELAY, delayInMilliseconds));
		
		if (delayInMilliseconds > validatedDelay)
		{
			return -1;
		}
		
		return timeUnit.convert(delayInMilliseconds, TimeUnit.MILLISECONDS);
	}
	
	/**
	 * Schedule a Runnable with a specific delay.
	 * @param r The Runnable to be scheduled.
	 * @param delay The delay before the Runnable is executed, in milliseconds.
	 * @return A ScheduledFuture representing the pending result of the task.
	 */
	public static ScheduledFuture<?> schedule(Runnable r, long delay)
	{
		return schedule(r, delay, TimeUnit.MILLISECONDS);
	}
	
	/**
	 * Schedule a Runnable with a specific delay and TimeUnit.
	 * @param r The Runnable to be scheduled.
	 * @param delay The delay before the Runnable is executed.
	 * @param timeUnit The time unit of the delay.
	 * @return A ScheduledFuture representing the result of the scheduling.
	 */
	private static ScheduledFuture<?> schedule(Runnable r, long delay, TimeUnit timeUnit)
	{
		delay = validate(delay, timeUnit);
		if (delay == -1)
		{
			return null;
		}
		
		return scheduledExecutor.schedule(new RunnableWrapper(r), delay, timeUnit);
	}
	
	/**
	 * Schedule a Runnable at a fixed rate with an initial delay.
	 * @param r The Runnable to be scheduled.
	 * @param initial The initial delay before the Runnable is executed for the first time.
	 * @param delay The delay between the execution of the Runnable in subsequent runs.
	 * @return A ScheduledFuture representing the result of the scheduling.
	 */
	public static ScheduledFuture<?> scheduleAtFixedRate(Runnable r, long initial, long delay)
	{
		return scheduleAtFixedRate(r, initial, delay, TimeUnit.MILLISECONDS);
	}
	
	/**
	 * Schedule a Runnable at a fixed rate with an initial delay and TimeUnit.
	 * @param r The Runnable to be scheduled.
	 * @param initial The initial delay before the Runnable is executed for the first time.
	 * @param delay The delay between the execution of the Runnable in subsequent runs.
	 * @param timeUnit The time unit of the delay.
	 * @return A ScheduledFuture representing the result of the scheduling.
	 */
	private static ScheduledFuture<?> scheduleAtFixedRate(Runnable r, long initial, long delay, TimeUnit timeUnit)
	{
		initial = validate(initial, timeUnit);
		if (initial == -1)
		{
			return null;
		}
		
		delay = validate(delay, timeUnit);
		if (delay == -1)
		{
			return scheduledExecutor.schedule(new RunnableWrapper(r), initial, timeUnit);
		}
		
		return scheduledExecutor.scheduleAtFixedRate(new RunnableWrapper(r), initial, delay, timeUnit);
	}
	
	/**
	 * Execute a Runnable instantly.
	 * @param r The Runnable to be executed.
	 */
	public static void execute(Runnable r)
	{
		instantExecutor.execute(r);
	}
	
	/**
	 * Shut down both thread pools.
	 * @throws InterruptedException If the shutdown process is interrupted.
	 */
	public static void shutdown() throws InterruptedException
	{
		try
		{
			scheduledExecutor.shutdown();
			if (!scheduledExecutor.awaitTermination(10, TimeUnit.SECONDS))
			{
				scheduledExecutor.shutdownNow();
			}
		}
		finally
		{
			instantExecutor.shutdown();
			if (!instantExecutor.awaitTermination(1, TimeUnit.MINUTES))
			{
				instantExecutor.shutdownNow();
			}
		}
	}
	
	/**
	 * Update the thread pool sizes based on system load.
	 */
	private static void updateThreadPoolSizes()
	{
		double systemLoad = getSystemCpuLoad();
		int availableCores = Runtime.getRuntime().availableProcessors();
		int newScheduledThreadPoolSize = calculateThreadPoolSize(availableCores, systemLoad, true);
		int newInstantThreadPoolSize = calculateThreadPoolSize(availableCores, systemLoad, false);
		
		if (newScheduledThreadPoolSize != scheduledExecutor.getCorePoolSize())
		{
			scheduledExecutor.setCorePoolSize(newScheduledThreadPoolSize);
			LOGGER.info("Updated scheduled thread pool size to " + newScheduledThreadPoolSize + ", CPU: " + getSystemCpuLoadPercentage());
		}
		if (newInstantThreadPoolSize != instantExecutor.getCorePoolSize())
		{
			instantExecutor.setCorePoolSize(newInstantThreadPoolSize);
			LOGGER.info("Updated instant thread pool size to " + newInstantThreadPoolSize + ", CPU: " + getSystemCpuLoadPercentage());
		}
	}
	
	/**
	 * Calculate the thread pool size based on available cores, system load, and whether it's a scheduled pool.
	 * @param availableCores The number of available processor cores.
	 * @param systemLoad The current system load.
	 * @param isScheduledPool A boolean indicating if the pool is a scheduled thread pool or not.
	 * @return The calculated thread pool size.
	 */
	private static int calculateThreadPoolSize(int availableCores, double systemLoad, boolean isScheduledPool)
	{
		double factor;
		if (systemLoad <= 0.4)
		{
			factor = isScheduledPool ? 4 : 2;
		}
		else if (systemLoad <= 0.6)
		{
			factor = isScheduledPool ? 3 : 1.5;
		}
		else if (systemLoad <= 0.8)
		{
			factor = isScheduledPool ? 2 : 1;
		}
		else
		{
			factor = 0.5;
		}
		
		int threadPoolSize = (int) Math.round(availableCores * factor);
		return Math.max(isScheduledPool ? 16 : 8, threadPoolSize);
	}
	
	/**
	 * Get the system CPU load.
	 * @return A double value representing the system CPU load.
	 */
	@SuppressWarnings("deprecation")
	private static double getSystemCpuLoad()
	{
		OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class);
		return osBean.getSystemCpuLoad();
	}
	
	/**
	 * Get the system CPU load as a percentage.
	 * @return A string representing the system CPU load percentage.
	 */
	private static String getSystemCpuLoadPercentage()
	{
		double cpuLoad = getSystemCpuLoad();
		return String.format("%.2f%%", cpuLoad * 100);
	}
}

 

Edited by Trance
  • Like 3
  • Thanks 2
  • Upvote 2
Link to comment
Share on other sites

14 minutes ago, Kishin said:

what of  ? 

PriorityThreadFactory


You don't need to copy and paste everything. Instead, you can use the same logic to implement it in your Thread Pool Manager. However, I am providing you with the files: https://files.lineage2.gold/threads.zip

Link to comment
Share on other sites

1 minute ago, Trance said:


You don't need to copy and paste everything. Instead, you can use the same logic to implement it in your Thread Pool Manager. However, I am providing you with the files: https://files.lineage2.gold/threads.zip

ye was about to do that , but was curious of its purpose . since even if i did implement it how it was , i would have to rework the whole src  .
though ty for the file . i'll look around and work around on these 

  • Upvote 1
Link to comment
Share on other sites

Please sign in to comment

You will be able to leave a comment after signing in



Sign In Now


×
×
  • Create New...