Archive

Posts Tagged ‘thread’

Java: Multiple Thread Controller

September 10th, 2009 No comments

ThreadThis example is for those of you writing server side Java daemons who want to take full control over how much CPU is used on multi-core machines. Essentially you have a controller objected which when constructed creates up to maxThreads worth of “thread workers” (this is your thread pool) which can be assigned to generic code of your choice, in the form of an IRunnableThread, which is basically a Java Runnable object.

ThreadTester is a basic implementation of an IRunnableThread which contains a simple unit test to show the thread controller working. Of course you can swap out ThreadTester for you own implementation.

The Daemon main statement shows how to start and use the thread controller at the upper most level. The argument to thread controller is the number of threads you require for simultaneous processing, in this example its 3

package ThreadControl;

import java.util.LinkedList;
import ThreadControl.ThreadWorker;

public class ThreadController implements Runnable{

private int maxThreads;

private ThreadWorker[] threads;

public static LinkedList<IRunnableThread> queue;

    /**
     *
     */

    public ThreadController(int maxThreads){
        this.maxThreads = maxThreads;
        queue = new LinkedList<IRunnableThread>();
        threads = new ThreadWorker[maxThreads];
    }

    public static int queueSize(){
    	return queue.size();
    }

    public int maxQueueSize(){
    	return maxThreads;
    }

    public synchronized void push(IRunnableThread threadCode){
    	 queue.addLast(threadCode);
         queue.notify();
    }

    public void push(IRunnableThread threadCode){
    	synchronized(queue) {
          queue.addLast(threadCode);
          queue.notify();
        }
    }

    public void run() {
        for (int i=0; i<maxThreads; i++) {
            threads[i] = new ThreadWorker();
            threads[i].start();
        }
    }

}

The implementation for one of the thread workers that is part of your thread pool of size: maxThreads

package ThreadControl;

import ThreadControl.ThreadController;

public class ThreadWorker extends Thread{

	public void run(){
		IRunnableThread r;
		
		while (true){
			synchronized(ThreadController.queue){
				while(ThreadController.queue.isEmpty()){
					try{
						ThreadController.queue.wait();
					}
					catch (InterruptedException ignored){
						
					}
				}
				r = ThreadController.queue.removeFirst();
			}
			// If we don't catch RuntimeException, 
			// the pool could leak threads
			try{
				r.setThreadId(Long.toString(this.getId()));
                r.run();
            }
			catch (RuntimeException e) {
                 // You might want to log something here
			}
		}
	}
	
}

Runnable thread interface

package ThreadControl;

public interface IRunnableThread extends Runnable{
	
	public String getThreadId();

	public void setThreadId(String threadId);
	
}

Test implementation for IRunnableThread,

package ThreadControl;

public class ThreadTester implements IRunnableThread{
	
	private final static int noAlertMessages = 5; 
	
	// In milliseconds
	private int messageDelay = 1000;
	
	private String testId = "";

	private String threadId = "";
	
	public ThreadTester(String testId){
		this.testId = testId;
	}
	
	public ThreadTester(String testId,int messageDelay){
		this.testId = testId;
		this.messageDelay = messageDelay;
	}
	
	public void run(){
		for(int i=0;i<noAlertMessages;i++){
			System.out.print("Thread Id: " + threadId + " - [" + testId + "]: test message #" + (i + 1) + "\n");
			System.out.print("Thread Queue Size: " + ThreadController.queueSize() + "\n");
			try{
				Thread.sleep(messageDelay);
			}
			catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		System.out.print("*** " + testId + " complete! ***\n");
	}

	public String getThreadId() {
		return threadId;
	}

	public void setThreadId(String threadId) {
		this.threadId = threadId;
	}

}

Main statement where the test case is demonstrated

import ThreadControl.ThreadController;
import ThreadControl.ThreadTester;

public class Daemon {

	public static void main(String[] args) {
		ThreadController threadController = new ThreadController(3);
		
		try{
			new Thread(threadController).start();
		} 
		catch (Exception e) {
			e.printStackTrace();
		}
		
		threadController.push(new ThreadTester("Test 1",5000));
		threadController.push(new ThreadTester("Test 2",333));
		threadController.push(new ThreadTester("Test 3",777));
		threadController.push(new ThreadTester("Test 4",450));
		threadController.push(new ThreadTester("Test 5",1024));
	}

}
Categories: Code, General, Java Tags: , ,