`

java BlockingQueue 阻塞队列版多线程消费生产实例

阅读更多

BlockingQueue为阻塞队列,它的实现形式有许多中,有固定的,链表的等等。机制都一样,让进入的线程加入等待

 

以下是BlockingQueue 阻塞队列版多线程消费生产实例:

 

public class Fetcher implements Runnable {
	
	private BlockingQueue<String> queue = null;
	
	public Fetcher(BlockingQueue<String> queue)
	{
		this.queue = queue;
	}

	@Override
	public void run() {
		// TODO Auto-generated method stub
		try
		{
			int i=0;
			/*while(true)
			{*/
			for(i=0;i<5;i++)
			{
				queue.put("segment-name-"+i);
				System.out.println("ThreadName : "+Thread.currentThread().getName()+"抓取完成");
				
			}
			Thread.sleep(50000);
			int j=0;
			for(j=0;j<25;j++)
			{
				queue.put("segment-name-"+(j+i));
				System.out.println("ThreadName : "+Thread.currentThread().getName()+"抓取完成");
			}
			System.out.println(Thread.currentThread().getName()+"thread--------"+(i+j));
		}
		catch (InterruptedException e) {
			// TODO: handle exception
			e.printStackTrace();
		}
	}
	
	

}

 

   生产者

 

 

 

public class Indexer implements Runnable {

	private BlockingQueue<String> queue;
	public Indexer(BlockingQueue<String> queue)
	{
		this.queue = queue;
	}
	@Override
	public void run() {
		// TODO Auto-generated method stub
		try{
			/*while(true)
			{*/
			int i;
			for(i=0;i<10;i++)
			{
				Thread.sleep(1000);
				String name = queue.take();
				System.out.println("ThreadName : " +Thread.currentThread().getName()+ " 索引创建完成 " +name);  
			}
		}catch (Exception e) {
			// TODO: handle exception
			e.printStackTrace();
		}
		
	}

}

   消费者

 

 

package com.test;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TestConsumer {
	
	private static BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10);

	public static void main(String[] args)
	{
		ExecutorService service = Executors.newCachedThreadPool();
		
		Fetcher producer = new Fetcher(queue);
		Indexer consumer = new Indexer(queue);
		Indexer consumerSecond = new Indexer(queue);
		service.submit(producer);
		service.submit(consumer);
		service.submit(consumerSecond);
		
		try{
			Thread.sleep(5000);
		}
		catch (Exception e) {
			// TODO: handle exception
			e.printStackTrace();
		}
	}
}

   主函数

 

 

 

分享到:
评论
1 楼 dacoolbaby 2012-11-14  
您的程序不是线程同步的。。
我要重写你的程序了。。

相关推荐

Global site tag (gtag.js) - Google Analytics