Jump to content

Thread Pool Manager (Dynamically on system's CPU load)


Recommended Posts

Hi,

 

This is a custom implementation of a thread pool management system, designed to efficiently manage the execution of tasks in a multi-threaded environment. Automatically adjusts the size of the thread pools based on the system's CPU load and the number of available cores. This dynamic adjustment ensures that the thread pools are always optimized for the current system load, resulting in efficient resource utilization and improved performance. If the calculated sizes are equal to the current core pool sizes, this means that there's no need to update the pool sizes, as they are already optimized for the current system load. I haven't conducted extensive testing yet; however, the code can be thoroughly tested, and if any issues are encountered, I can make the necessary modifications. Comments were added throughout the code.

 

For testing purposes you can do the following:

 

		scheduleAtFixedRate(() ->
		{
			try
			{
				purge();
				updateThreadPoolSizes();
			}
			catch (Exception e)
			{
				LOGGER.severe("Error during scheduled ThreadPool update: " + e.getMessage());
			}
		}, 10, 10, TimeUnit.SECONDS);

 

	private static void updateThreadPoolSizes()
	{
		double systemLoad = getSystemCpuLoad();
		int availableCores = Runtime.getRuntime().availableProcessors();
		int newScheduledThreadPoolSize = calculateThreadPoolSize(availableCores, systemLoad, true);
		int newInstantThreadPoolSize = calculateThreadPoolSize(availableCores, systemLoad, false);
		
		if (newScheduledThreadPoolSize != scheduledExecutor.getCorePoolSize())
		{
			scheduledExecutor.setCorePoolSize(newScheduledThreadPoolSize);
			LOGGER.info("Updated scheduled thread pool size to " + newScheduledThreadPoolSize + ", CPU: " + getSystemCpuLoadPercentage());
		}
		else
		{
			LOGGER.info("Scheduled thread pool size remains to " + newScheduledThreadPoolSize + ", CPU: " + getSystemCpuLoadPercentage());
		}
		if (newInstantThreadPoolSize != instantExecutor.getCorePoolSize())
		{
			instantExecutor.setCorePoolSize(newInstantThreadPoolSize);
			LOGGER.info("Updated instant thread pool size to " + newInstantThreadPoolSize + ", CPU: " + getSystemCpuLoadPercentage());
		}
		else
		{
			LOGGER.info("Instant thread pool size remains to " + newInstantThreadPoolSize + ", CPU: " + getSystemCpuLoadPercentage());
		}
	}

 

P.S.: This is the lowest it can get:
 

return Math.max(isScheduledPool ? 16 : 8, threadPoolSize);

 

The initial code:
 

 * This file is part of the L2Gold Classic project.
 * 
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 * 
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
 * General Public License for more details.
 * 
 * You should have received a copy of the GNU General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
package gold.lineage2.commons.threads;

import java.lang.management.ManagementFactory;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

import com.sun.management.OperatingSystemMXBean;

/**
 * A custom implementation of a thread pool management system, designed to efficiently manage the execution of tasks in a multi-threaded environment.
 * Automatically adjusts the size of the thread pools based on the system's CPU load and the number of available cores. This dynamic adjustment ensures
 * that the thread pools are always optimized for the current system load, resulting in efficient resource utilization and improved performance.
 * @author Trance
 */
public class ThreadPool
{
	// Logger to log information and errors.
	private static final Logger LOGGER = Logger.getLogger(ThreadPool.class.getName());
	// Maximum delay to be used to validate the delay.
	private static final long MAX_DELAY = TimeUnit.NANOSECONDS.toMillis(Long.MAX_VALUE - System.nanoTime()) / 2;
	
	// ScheduledThreadPoolExecutor for scheduled tasks.
	private static ScheduledThreadPoolExecutor scheduledExecutor;
	// ThreadPoolExecutor for instant tasks.
	private static ThreadPoolExecutor instantExecutor;
	
	/**
	 * Initialize the ThreadPool with the appropriate sizes.
	 */
	public static void init()
	{
		LOGGER.info("ThreadPool: Initialized");
		
		int availableCores = Runtime.getRuntime().availableProcessors();
		int scheduledThreadPoolSize = availableCores * 4;
		int instantThreadPoolSize = availableCores * 2;
		
		scheduledExecutor = new ScheduledThreadPoolExecutor(scheduledThreadPoolSize, new PriorityThreadFactory("ScheduledThreadPool", Thread.NORM_PRIORITY), new ThreadPoolExecutor.CallerRunsPolicy());
		scheduledExecutor.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl());
		scheduledExecutor.prestartAllCoreThreads();
		LOGGER.info("Scheduled thread pool size: " + scheduledThreadPoolSize);
		
		instantExecutor = new ThreadPoolExecutor(instantThreadPoolSize, Integer.MAX_VALUE, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new PriorityThreadFactory("ThreadPoolExecutor", Thread.NORM_PRIORITY), new ThreadPoolExecutor.CallerRunsPolicy());
		instantExecutor.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl());
		instantExecutor.prestartAllCoreThreads();
		LOGGER.info("Instant thread pool size: " + instantThreadPoolSize);
		
		scheduleAtFixedRate(() ->
		{
			try
			{
				purge();
				updateThreadPoolSizes();
			}
			catch (Exception e)
			{
				LOGGER.severe("Error during scheduled ThreadPool update: " + e.getMessage());
			}
		}, 5, 5, TimeUnit.MINUTES);
	}
	
	/**
	 * Purge both thread pools.
	 */
	public static void purge()
	{
		scheduledExecutor.purge();
		instantExecutor.purge();
	}
	
	/**
	 * Validate the delay ensuring it's within acceptable bounds.
	 * @param delay The delay value to be validated.
	 * @param timeUnit The time unit of the delay.
	 * @return A validated delay value.
	 */
	private static long validate(long delay, TimeUnit timeUnit)
	{
		long delayInMilliseconds = timeUnit.toMillis(delay);
		long validatedDelay = Math.max(0, Math.min(MAX_DELAY, delayInMilliseconds));
		
		if (delayInMilliseconds > validatedDelay)
		{
			return -1;
		}
		
		return timeUnit.convert(delayInMilliseconds, TimeUnit.MILLISECONDS);
	}
	
	/**
	 * Schedule a Runnable with a specific delay.
	 * @param r The Runnable to be scheduled.
	 * @param delay The delay before the Runnable is executed, in milliseconds.
	 * @return A ScheduledFuture representing the pending result of the task.
	 */
	public static ScheduledFuture<?> schedule(Runnable r, long delay)
	{
		return schedule(r, delay, TimeUnit.MILLISECONDS);
	}
	
	/**
	 * Schedule a Runnable with a specific delay and TimeUnit.
	 * @param r The Runnable to be scheduled.
	 * @param delay The delay before the Runnable is executed.
	 * @param timeUnit The time unit of the delay.
	 * @return A ScheduledFuture representing the result of the scheduling.
	 */
	private static ScheduledFuture<?> schedule(Runnable r, long delay, TimeUnit timeUnit)
	{
		delay = validate(delay, timeUnit);
		if (delay == -1)
		{
			return null;
		}
		
		return scheduledExecutor.schedule(new RunnableWrapper(r), delay, timeUnit);
	}
	
	/**
	 * Schedule a Runnable at a fixed rate with an initial delay.
	 * @param r The Runnable to be scheduled.
	 * @param initial The initial delay before the Runnable is executed for the first time.
	 * @param delay The delay between the execution of the Runnable in subsequent runs.
	 * @return A ScheduledFuture representing the result of the scheduling.
	 */
	public static ScheduledFuture<?> scheduleAtFixedRate(Runnable r, long initial, long delay)
	{
		return scheduleAtFixedRate(r, initial, delay, TimeUnit.MILLISECONDS);
	}
	
	/**
	 * Schedule a Runnable at a fixed rate with an initial delay and TimeUnit.
	 * @param r The Runnable to be scheduled.
	 * @param initial The initial delay before the Runnable is executed for the first time.
	 * @param delay The delay between the execution of the Runnable in subsequent runs.
	 * @param timeUnit The time unit of the delay.
	 * @return A ScheduledFuture representing the result of the scheduling.
	 */
	private static ScheduledFuture<?> scheduleAtFixedRate(Runnable r, long initial, long delay, TimeUnit timeUnit)
	{
		initial = validate(initial, timeUnit);
		if (initial == -1)
		{
			return null;
		}
		
		delay = validate(delay, timeUnit);
		if (delay == -1)
		{
			return scheduledExecutor.schedule(new RunnableWrapper(r), initial, timeUnit);
		}
		
		return scheduledExecutor.scheduleAtFixedRate(new RunnableWrapper(r), initial, delay, timeUnit);
	}
	
	/**
	 * Execute a Runnable instantly.
	 * @param r The Runnable to be executed.
	 */
	public static void execute(Runnable r)
	{
		instantExecutor.execute(r);
	}
	
	/**
	 * Shut down both thread pools.
	 * @throws InterruptedException If the shutdown process is interrupted.
	 */
	public static void shutdown() throws InterruptedException
	{
		try
		{
			scheduledExecutor.shutdown();
			if (!scheduledExecutor.awaitTermination(10, TimeUnit.SECONDS))
			{
				scheduledExecutor.shutdownNow();
			}
		}
		finally
		{
			instantExecutor.shutdown();
			if (!instantExecutor.awaitTermination(1, TimeUnit.MINUTES))
			{
				instantExecutor.shutdownNow();
			}
		}
	}
	
	/**
	 * Update the thread pool sizes based on system load.
	 */
	private static void updateThreadPoolSizes()
	{
		double systemLoad = getSystemCpuLoad();
		int availableCores = Runtime.getRuntime().availableProcessors();
		int newScheduledThreadPoolSize = calculateThreadPoolSize(availableCores, systemLoad, true);
		int newInstantThreadPoolSize = calculateThreadPoolSize(availableCores, systemLoad, false);
		
		if (newScheduledThreadPoolSize != scheduledExecutor.getCorePoolSize())
		{
			scheduledExecutor.setCorePoolSize(newScheduledThreadPoolSize);
			LOGGER.info("Updated scheduled thread pool size to " + newScheduledThreadPoolSize + ", CPU: " + getSystemCpuLoadPercentage());
		}
		if (newInstantThreadPoolSize != instantExecutor.getCorePoolSize())
		{
			instantExecutor.setCorePoolSize(newInstantThreadPoolSize);
			LOGGER.info("Updated instant thread pool size to " + newInstantThreadPoolSize + ", CPU: " + getSystemCpuLoadPercentage());
		}
	}
	
	/**
	 * Calculate the thread pool size based on available cores, system load, and whether it's a scheduled pool.
	 * @param availableCores The number of available processor cores.
	 * @param systemLoad The current system load.
	 * @param isScheduledPool A boolean indicating if the pool is a scheduled thread pool or not.
	 * @return The calculated thread pool size.
	 */
	private static int calculateThreadPoolSize(int availableCores, double systemLoad, boolean isScheduledPool)
	{
		double factor;
		if (systemLoad <= 0.4)
		{
			factor = isScheduledPool ? 4 : 2;
		}
		else if (systemLoad <= 0.6)
		{
			factor = isScheduledPool ? 3 : 1.5;
		}
		else if (systemLoad <= 0.8)
		{
			factor = isScheduledPool ? 2 : 1;
		}
		else
		{
			factor = 0.5;
		}
		
		int threadPoolSize = (int) Math.round(availableCores * factor);
		return Math.max(isScheduledPool ? 16 : 8, threadPoolSize);
	}
	
	/**
	 * Get the system CPU load.
	 * @return A double value representing the system CPU load.
	 */
	@SuppressWarnings("deprecation")
	private static double getSystemCpuLoad()
	{
		OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class);
		return osBean.getSystemCpuLoad();
	}
	
	/**
	 * Get the system CPU load as a percentage.
	 * @return A string representing the system CPU load percentage.
	 */
	private static String getSystemCpuLoadPercentage()
	{
		double cpuLoad = getSystemCpuLoad();
		return String.format("%.2f%%", cpuLoad * 100);
	}
}

 

Edited by Trance
  • Like 3
  • Thanks 2
  • Upvote 2
Link to comment
Share on other sites

14 minutes ago, Kishin said:

what of  ? 

PriorityThreadFactory


You don't need to copy and paste everything. Instead, you can use the same logic to implement it in your Thread Pool Manager. However, I am providing you with the files: https://files.lineage2.gold/threads.zip

Link to comment
Share on other sites

1 minute ago, Trance said:


You don't need to copy and paste everything. Instead, you can use the same logic to implement it in your Thread Pool Manager. However, I am providing you with the files: https://files.lineage2.gold/threads.zip

ye was about to do that , but was curious of its purpose . since even if i did implement it how it was , i would have to rework the whole src  .
though ty for the file . i'll look around and work around on these 

  • Upvote 1
Link to comment
Share on other sites

Join the conversation

You can post now and register later. If you have an account, sign in now to post with your account.
Note: Your post will require moderator approval before it will be visible.

Guest
Reply to this topic...

×   Pasted as rich text.   Paste as plain text instead

  Only 75 emoji are allowed.

×   Your link has been automatically embedded.   Display as a link instead

×   Your previous content has been restored.   Clear editor

×   You cannot paste images directly. Upload or insert images from URL.



  • Posts

    • ??? error   https://ibb.co/Srn4LsX No support  :S   No support : S ? :S
    • I also think this feature would bring more harm than good... another example is a dwarf rushing against your mobs so he takes priority, then he plays fake death and proceeds to grief the shit out of you. A minimum damage threshold is necessary, which would basically converge to what is the default feature (i.e. biggest damage dealer)
    • I bought the premium to watch the video and it's private. 😢
    • For some Majestic reason OP can't edit their topics anymore?    Let me update the potential offers, I got up to 2 spots to fill.   CONTACT TELEGRAM ONLY: https://t.me/asupport_g1     A credible team that SGuard represents is looking for java developers.    These positions is full time only i.e. your time is mostly dedicated within the team and team tasks, side gigs are fine, as long as you are able to combine your gig and main.  All necessary tools are provided.  The team is consistent with > 10 team members and the team is looking to expand to meet various goals. A decent Lineage 2 Essense OR Main (GOD) knowledge is needed.  Experience with l2r/l2p/l2s. mobius is fine too for referrence only. The team is friendly and goal oriented, very active and resourceful.   Requirements are standard, java knowledge is optional.  Multi language is a plus but not necessary (English/Russian), any is accepted.   Terms and conditions including compensation are reasonable and considered to be up for the industry standards. Details are negotiable with suitable candidates.   You will need to have a headset and a mic just for the initial processing, there're no team meetings or conferences after that. There's a small 30min interview process to demonstrate knowledge and just to meet each other.   The team has decided it is in their best interest to not publish their information at this time. Details will be available only for suitable candidates.   CONTACT TELEGRAM ONLY: https://t.me/asupport_g1
    • Thanks for the detailed info. I also have some ideas in mind but the execution may be hard. Thanks again!
  • Topics

×
×
  • Create New...