分享免费的编程资源和教程

网站首页 > 技术教程 正文

泛型并发对象池

goqiw 2024-09-22 09:48:39 技术教程 34 ℃ 0 评论

了解如何在 Java 中创建对象池

每天?分享?最新?软件?开发?,Devops,敏捷?,测试?以及?项目?管理?最新?,最热门?的?文章?,每天?花?3分钟?学习?何乐而不为?,希望?大家?点赞?,评论,加?关注?,你的?支持?是我?最大?的?动力?。
在这篇文章中,我们将看看如何在 Java 中创建对象池。近年来,JVM 的性能已经成倍增长,对于大多数类型的对象来说,为了获得更好的性能而建立对象池几乎是多余的。从本质上讲,对象的创建不再像以前那样昂贵。

然而,有些物品在创造之初就被证明是昂贵的。线程、数据库连接对象等对象不是轻量级对象,创建它们的成本稍微高一些。在任何应用程序中,我们都需要使用上述类型的多个对象。因此,如果有一种简单的方法来创建和维护这种类型的对象池,以便可以动态地使用和重用对象,而不会让客户机代码受到对象的活动周期的干扰,那就太好了。

在实际编写对象池的代码之前,让我们确定任何对象池都必须满足的主要要求。

  • 如果有可用的对象,则池必须允许客户机使用对象
  • 一旦对象被客户机返回到池中,它必须重用这些对象
  • 如果需要,它必须能够创建更多的对象来满足客户端不断增长的需求
  • 它必须提供适当的关闭机制,以便在关闭时不会发生内存泄漏

不用说,以上几点将构成我们将公开给客户端的接口的基础。

我们的接口声明如下:

package com.test.pool;


/**
 * Represents a cached pool of objects.
 * 
 * @author Swaranga
 *
 * @param < T > the type of object to pool.
 */
public interface Pool< T >
{
 /**
  * Returns an instance from the pool. 
  * The call may be a blocking one or a non-blocking one 
  * and that is determined by the internal implementation.
  * 
  * If the call is a blocking call, 
  * the call returns immediately with a valid object 
  * if available, else the thread is made to wait 
  * until an object becomes available.
  * In case of a blocking call, 
  * it is advised that clients react 
  * to {@link InterruptedException} which might be thrown
  * when the thread waits for an object to become available.
  * 
  * If the call is a non-blocking one, 
  * the call returns immediately irrespective of 
  * whether an object is available or not.
  * If any object is available the call returns it 
  * else the call returns < code >null< /code >.
  * 
  * The validity of the objects are determined using the
  * {@link Validator} interface, such that 
  * an object < code >o< /code > is valid if 
  * < code > Validator.isValid(o) == true < /code >.
  * 
  * @return T one of the pooled objects.
  */
 T get();
 
 /**
  * Releases the object and puts it back to the pool.
  * 
  * The mechanism of putting the object back to the pool is
  * generally asynchronous, 
  * however future implementations might differ.
  * 
  * @param t the object to return to the pool
  */
  
 void release(T t);
 
 /**
  * Shuts down the pool. In essence this call will not 
  * accept any more requests 
  * and will release all resources.
  * Releasing resources are done 
  * via the < code >invalidate()< /code >
  * method of the {@link Validator} interface.
  */
 
 void shutdown();
}


上面的接口有意使之非常简单和通用,以支持任何类型的对象。它提供了从池中获取/返回对象的方法。它还提供了一种关闭机制来释放对象

现在我们尝试创建上述接口的实现。但是在这样做之前,需要注意的是,理想的 release ()方法首先会尝试检查客户机返回的对象是否仍然可重用。如果是,那么它将返回到池中,否则必须丢弃该对象。我们希望 Pool 接口的每个实现都遵循这条规则。因此,在创建具体实现之前,我们先创建一个抽象实现,对后续实现施加此限制。令人惊讶的是,我们的抽象实现将被称为 AbstractPool,它的定义如下:

package com.test.pool;

/**
 * Represents an abstract pool, that defines the procedure
 * of returning an object to the pool.
 * 
 * @author Swaranga
 *
 * @param < T > the type of pooled objects.
 */
abstract class AbstractPool < T > implements Pool < T >
{
 /**
  * Returns the object to the pool. 
  * The method first validates the object if it is
  * re-usable and then puts returns it to the pool.
  * 
  * If the object validation fails, 
  * some implementations
  * will try to create a new one 
  * and put it into the pool; however 
  * this behaviour is subject to change 
  * from implementation to implementation
  * 
  */
 @Override
 public final void release(T t)
 {
  if(isValid(t))
  {
   returnToPool(t);
  }
  else
  {
   handleInvalidReturn(t);
  }
 }
 
 protected abstract void handleInvalidReturn(T t);
 
 protected abstract void returnToPool(T t);
 
 protected abstract boolean isValid(T t);
}

在上面的类中,我们强制要求对象池在将对象返回到池之前验证对象。为了定制它们池的行为,实现可以自由选择它们实现这三个抽象方法的方式。他们将使用自己的逻辑来决定,如何检查一个对象是否可以重用[ valid()方法] ,如果客户端返回的对象无效[ handleInvalidReturn ()方法]怎么办,以及向池返回一个有效对象的实际逻辑[ returToPool ()方法]


现在有了上面的一组类,我们就差不多可以开始具体的实现了。但问题是,由于上述类的设计是为了支持泛型对象池,因此上述类的泛型实现将不知道如何验证对象[因为对象将是泛型的: ——]。因此我们需要其他的东西来帮助我们

我们实际上需要的是一种验证对象的通用方法,这样具体的 Pool 实现就不必担心被验证对象的类型。因此,我们引入了一个新的接口 Validator,它定义了验证对象的方法。我们对 Validator 接口的定义如下:

package com.test.pool;

 /**
  * Represents the functionality to 
  * validate an object of the pool
  * and to subsequently perform cleanup activities.
  * 
  * @author Swaranga
  *
  * @param < T > the type of objects to validate and cleanup.
  */
 public static interface Validator < T >
 {
  /**
   * Checks whether the object is valid.
   * 
   * @param t the object to check.
   * 
   * @return <code>true</code> 
   * if the object is valid else <code>false</code>.
   */
  public boolean isValid(T t);
  
  /**
   * Performs any cleanup activities 
   * before discarding the object.
   * For example before discarding 
   * database connection objects,
   * the pool will want to close the connections. 
   * This is done via the 
   * <code>invalidate()</code> method.
   * 
   * @param t the object to cleanup
   */
  
  public void invalidate(T t);
 }


上面的接口定义了用于检查对象是否有效的方法,以及用于使对象无效的方法。当我们想要丢弃一个对象并清除该实例使用的任何内存时,应该使用无效方法。请注意,这个接口本身没有什么意义,只有在对象池的上下文中使用时才有意义。因此,我们在顶级 Pool 接口中定义了这个接口。这类似于 Map 和 Map。Java 集合库中的条目接口。因此,我们的 Pool 界面如下:

package com.test.pool;


/**
 * Represents a cached pool of objects.
 * 
 * @author Swaranga
 *
 * @param < T > the type of object to pool.
 */
public interface Pool< T >
{
 /**
  * Returns an instance from the pool. 
  * The call may be a blocking one or a non-blocking one 
  * and that is determined by the internal implementation.
  * 
  * If the call is a blocking call, 
  * the call returns immediately with a valid object 
  * if available, else the thread is made to wait 
  * until an object becomes available.
  * In case of a blocking call, 
  * it is advised that clients react 
  * to {@link InterruptedException} which might be thrown
  * when the thread waits for an object to become available.
  * 
  * If the call is a non-blocking one, 
  * the call returns immediately irrespective of 
  * whether an object is available or not.
  * If any object is available the call returns it 
  * else the call returns < code >null< /code >.
  * 
  * The validity of the objects are determined using the
  * {@link Validator} interface, such that 
  * an object < code >o< /code > is valid if 
  * < code > Validator.isValid(o) == true < /code >.
  * 
  * @return T one of the pooled objects.
  */
 T get();
 
 /**
  * Releases the object and puts it back to the pool.
  * 
  * The mechanism of putting the object back to the pool is
  * generally asynchronous, 
  * however future implementations might differ.
  * 
  * @param t the object to return to the pool
  */
  
 void release(T t);
 
 /**
  * Shuts down the pool. In essence this call will not 
  * accept any more requests 
  * and will release all resources.
  * Releasing resources are done 
  * via the < code >invalidate()< /code >
  * method of the {@link Validator} interface.
  */
 
 void shutdown();

 /**
  * Represents the functionality to 
  * validate an object of the pool
  * and to subsequently perform cleanup activities.
  * 
  * @author Swaranga
  *
  * @param < T > the type of objects to validate and cleanup.
  */
 public static interface Validator < T >
 {
  /**
   * Checks whether the object is valid.
   * 
   * @param t the object to check.
   * 
   * @return <code>true</code> 
   * if the object is valid else <code>false</code>.
   */
  public boolean isValid(T t);
  
  /**
   * Performs any cleanup activities 
   * before discarding the object.
   * For example before discarding 
   * database connection objects,
   * the pool will want to close the connections. 
   * This is done via the 
   * <code>invalidate()</code> method.
   * 
   * @param t the object to cleanup
   */
  
  public void invalidate(T t);
 }
}


我们几乎已经准备好具体实施了。但在此之前,我们需要最后一个武器,这实际上是对象库中最重要的武器。它被称为“创建新对象的能力”。

因为我们的对象池是通用的,它们必须知道如何创建新对象来填充它的池。此功能也必须不依赖于对象池的类型,并且必须是创建新对象的常用方法。实现这一点的方法是使用一个名为 ObjectFactory 的接口,该接口只定义一个方法,即“如何创建新对象”。ObjectFactory 接口如下:

package com.test.pool;

/**
 * Represents the mechanism to create 
 * new objects to be used in an object pool.
 * 
 * @author Swaranga
 *
 * @param < T > the type of object to create. 
 */
public interface ObjectFactory < T >
{
 /**
  * Returns a new instance of an object of type T.
  * 
  * @return T an new instance of the object of type T
  */
 public abstract T createNew();
}

我们最终完成了 helper 类,现在我们将创建 Pool 接口的具体实现。因为我们想要一个可以在并发应用程序中使用的池,所以我们将创建一个阻塞池,在池中没有可用对象时阻塞客户机。阻塞机制将无限期地阻塞,直到一个对象变得可用。这种类型的实现导致了另一种方法的出现,这种方法只会在给定的超时时间段内阻塞,如果在超时之前任何对象变得可用,那么在超时之后返回该对象,而不是永远等待,那么返回一个 null 对象。这个实现类似于 Java 并发 API 的 LinkedBlockingQueue 实现,因此在实现实际类之前,我们公开另一个实现 BlockingPool,它类似于 Java 并发 API 的 BlockingQueue 接口
因此 Blockingpool 接口声明如下:

package com.test.pool;

import java.util.concurrent.TimeUnit;

/**
 * Represents a pool of objects that makes the 
 * requesting threads wait if no object is available.
 * 
 * @author Swaranga
 *
 * @param < T > the type of objects to pool.
 */
public interface BlockingPool < T > extends Pool < T >
{
 /**
  * Returns an instance of type T from the pool.
  * 
  * The call is a blocking call, 
  * and client threads are made to wait
  * indefinitely until an object is available. 
  * The call implements a fairness algorithm 
  * that ensures that a FCFS service is implemented.
  * 
  * Clients are advised to react to InterruptedException. 
  * If the thread is interrupted while waiting 
  * for an object to become available,
  * the current implementations 
  * sets the interrupted state of the thread 
  * to <code>true</code> and returns null. 
  * However this is subject to change 
  * from implementation to implementation.
  * 
  * @return T an instance of the Object 
  * of type T from the pool.
  */
 T get();
 
 /**
  * Returns an instance of type T from the pool, 
  * waiting up to the
  * specified wait time if necessary 
  * for an object to become available..
  * 
  * The call is a blocking call, 
  * and client threads are made to wait
  * for time until an object is available 
  * or until the timeout occurs. 
  * The call implements a fairness algorithm 
  * that ensures that a FCFS service is implemented.
  * 
  * Clients are advised to react to InterruptedException. 
  * If the thread is interrupted while waiting 
  * for an object to become available,
  * the current implementations 
  * set the interrupted state of the thread 
  * to <code>true</code> and returns null. 
  * However this is subject to change 
  * from implementation to implementation.
  *  
  * 
  * @param time amount of time to wait before giving up, 
  *   in units of <tt>unit</tt>
  * @param unit a <tt>TimeUnit</tt> determining 
  *   how to interpret the
  *        <tt>timeout</tt> parameter
  *        
  * @return T an instance of the Object 
  * of type T from the pool.
  *        
  * @throws InterruptedException 
  * if interrupted while waiting
  */
 
 T get(long time, TimeUnit unit) throws InterruptedException;
}

我们的 BoundedBlockingPool 实现如下:

package com.test.pool;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public final class BoundedBlockingPool < T > 
 extends AbstractPool < T >
 implements BlockingPool < T >
{
 private int size;
 
 private BlockingQueue < T > objects;
 
 private Validator < T > validator;
 private ObjectFactory < T > objectFactory;
 
 private ExecutorService executor = 
  Executors.newCachedThreadPool();
  
 private volatile boolean shutdownCalled;
 
 public BoundedBlockingPool(
   int size, 
   Validator < T > validator, 
   ObjectFactory < T > objectFactory)
 {
  super();
  
  this.objectFactory = objectFactory;
  this.size = size;
  this.validator = validator;
  
  objects = new LinkedBlockingQueue < T >(size);
  
  initializeObjects();
  
  shutdownCalled = false;
 }
 
 public T get(long timeOut, TimeUnit unit)
 {
  if(!shutdownCalled)
  {
   T t = null;
   
   try
   {
    t = objects.poll(timeOut, unit);
    
    return t;
   }
   catch(InterruptedException ie)
   {
    Thread.currentThread().interrupt();
   }
   
   return t;
  }
  
  throw new IllegalStateException(
   "Object pool is already shutdown");
 }
 
 public T get()
 {
  if(!shutdownCalled)
  {
   T t = null;
   
   try
   {
    t = objects.take();
   }
   catch(InterruptedException ie)
   {
    Thread.currentThread().interrupt();
   }
   
   return t;
  }
  
  throw new IllegalStateException(
   "Object pool is already shutdown");
 }
 
 public void shutdown()
 {
  shutdownCalled = true;
  
  executor.shutdownNow();
  
  clearResources();
 }
 
 private void clearResources()
 {
  for(T t : objects)
  {
   validator.invalidate(t);
  }
 }
 
 @Override
 protected void returnToPool(T t)
 {
  if(validator.isValid(t))
  {
   executor.submit(new ObjectReturner(objects, t));
  }
 }
 
 @Override
 protected void handleInvalidReturn(T t)
 {
  
 }
 
 @Override
 protected boolean isValid(T t)
 {
  return validator.isValid(t);
 }
 
 private void initializeObjects()
 {
  for(int i = 0; i < size; i++)
  {
   objects.add(objectFactory.createNew());
  }
 }
 
 private class ObjectReturner < E > 
            implements Callable < Void >
 {
  private BlockingQueue < E > queue;
  private E e;
  
  public ObjectReturner(BlockingQueue < E > queue, E e)
  {
   this.queue = queue;
   this.e = e;
  }
  
  public Void call()
  {
   while(true)
   {
    try
    {
     queue.put(e);
     break;
    }
    catch(InterruptedException ie)
    {
     Thread.currentThread().interrupt();
    }
   }
   
   return null;
  }
 }
}


上面是一个由 LinkedBlockingQueue 内部支持的非常基本的对象池。唯一感兴趣的方法是 returToPool ()方法。由于内部存储是一个阻塞池,如果我们试图将返回的元素直接放入 LinkedBlockingPool,那么如果队列已满,它可能会阻塞客户机。但是我们不希望对象池的客户机仅仅为了一些普通的任务(比如将对象返回到池中)而阻塞。因此,我们已经完成了将对象作为异步任务插入到 LinkedBlockingQueue 中并将其提交给 Execator 实例的实际任务,这样客户端线程就可以立即返回。
现在我们将在代码中使用上面的对象池。我们将使用对象池来池一些数据库连接对象。因此,我们将需要一个 Validator 来验证我们的数据库连接对象。
我们的 JDBCConnectionValidator 如下所示:

package com.test;

import java.sql.Connection;
import java.sql.SQLException;

import com.test.pool.Pool.Validator;

public final class JDBCConnectionValidator 
    implements Validator < Connection >
{
 public boolean isValid(Connection con)
 { 
  if(con == null)
  {
   return false;
  }
  
  try
  {
   return !con.isClosed();
  }
  catch(SQLException se)
  {
   return false;
  }
 }
 
 public void invalidate(Connection con)
 {
  try
  {
   con.close();
  }
  catch(SQLException se)
  {
   
  }
 }
}


使对象池能够创建新对象的 JDBCObjectFactory 如下所示:

package com.test;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

import com.test.pool.ObjectFactory;

public class JDBCConnectionFactory 
 implements ObjectFactory < Connection >
{
 private String connectionURL;
 private String userName;
 private String password;
  
 public JDBCConnectionFactory(
  String driver, 
  String connectionURL, 
  String userName, 
  String password)
        {
         super();
         
         try
         {
          Class.forName(driver);
         }
         catch(ClassNotFoundException ce)
         {
          throw new IllegalArgumentException(
           "Unable to find driver in classpath", ce);
         }
         
         this.connectionURL = connectionURL;
         this.userName = userName;
         this.password = password;
        }
 
 public Connection createNew()
 { 
  try
  {
   return 
       DriverManager.getConnection(
    connectionURL, 
    userName, 
    password);
  }
  catch(SQLException se)
  {
   throw new IllegalArgumentException(
    "Unable to create new connection", se);
  }
 }
}


现在我们使用上面的 Validator 和 ObjectFactory 创建一个 JDBC 对象池:

package com.test;
import java.sql.Connection;

import com.test.pool.Pool;
import com.test.pool.PoolFactory;


public class Main
{
 public static void main(String[] args)
    {
  Pool < Connection > pool = 
   new BoundedBlockingPool < Connection > (
    10, 
    new JDBCConnectionValidator(),
    new JDBCConnectionFactory("", "", "", "")
    );
  
  //do whatever you like
    }
}


作为阅读整篇文章的奖励。我将提供 Pool 接口的另一个实现,它本质上是一个非阻塞对象池。这个实现与前一个实现的唯一区别是,如果某个元素不可用,这个实现不会阻塞客户端,而是返回 null。开始了:

package com.test.pool;

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.Semaphore;

public class BoundedPool < T > 
 extends AbstractPool < T >
{
 private int size;
 
 private Queue < T > objects;
 
 private Validator < T > validator;
 private ObjectFactory < T > objectFactory;
 
 private Semaphore permits;
  
 private volatile boolean shutdownCalled;
 
 public BoundedPool(
  int size, 
  Validator < T > validator, 
  ObjectFactory < T > objectFactory)
 {
  super();
  
  this.objectFactory = objectFactory;
  this.size = size;
  this.validator = validator;
  
  objects = new LinkedList < T >();
  
  initializeObjects();
  
  shutdownCalled = false;
 }
 
 
 @Override
 public T get()
 {
  T t = null;
  
  if(!shutdownCalled)
  {
   if(permits.tryAcquire())
   {
    t = objects.poll();
   }
  }
  else
  {
   throw new IllegalStateException(
    "Object pool already shutdown");
  }
  
  return t;
 }

 @Override
 public void shutdown()
 {
  shutdownCalled = true;
  
  clearResources();
 }
 
 private void clearResources()
 {
  for(T t : objects)
  {
   validator.invalidate(t);
  }
 }

 @Override
 protected void returnToPool(T t)
 {
  boolean added = objects.add(t);
  
  if(added)
  {
   permits.release();
  }
 }
 
 @Override
 protected void handleInvalidReturn(T t)
 {
  
 }

 @Override
 protected boolean isValid(T t)
 {
  return validator.isValid(t);
 }
 
 private void initializeObjects()
 {
  for(int i = 0; i < size; i++)
  {
   objects.add(objectFactory.createNew());
  }
 }
}


考虑到我们现在有两个强大的实现,最好让用户通过具有有意义名称的工厂来创建池。这是工厂:

package com.test.pool;

import com.test.pool.Pool.Validator;

/**
 * Factory and utility methods for 
 * {@link Pool} and {@link BlockingPool} classes 
 * defined in this package. 
 * This class supports the following kinds of methods:
 *
 * <ul>
 *   <li> Method that creates and returns a default non-blocking 
 *        implementation of the {@link Pool} interface.
 *   </li>
 *   
 *   <li> Method that creates and returns a 
 *        default implementation of 
 *        the {@link BlockingPool} interface.
 *   </li>
 * </ul>
 *
 * @author Swaranga
 */

public final class PoolFactory
{
 private PoolFactory()
 {
  
 }
 
 /**
  * Creates a and returns a new object pool,
  * that is an implementation of the {@link BlockingPool}, 
  * whose size is limited by
  * the <tt> size </tt> parameter.
  * 
  * @param size the number of objects in the pool.
  * @param factory the factory to create new objects.
  * @param validator the validator to 
  * validate the re-usability of returned objects.
  * 
  * @return a blocking object pool
  * bounded by <tt> size </tt>
  */
 public static < T > Pool < T > 
  newBoundedBlockingPool(
      int size, 
      ObjectFactory < T > factory, 
      Validator < T > validator)
 {
  return new BoundedBlockingPool < T > (
                                    size, 
                                    validator,
                                    factory);
 }
 
 /**
  * Creates a and returns a new object pool,
  * that is an implementation of the {@link Pool} 
  * whose size is limited 
  * by the <tt> size </tt> parameter.
  * 
  * @param size the number of objects in the pool.
  * @param factory the factory to create new objects.
  * @param validator the validator to validate 
  * the re-usability of returned objects.
  * 
  * @return an object pool bounded by <tt> size </tt>
  */
 
 public static < T > Pool < T > newBoundedNonBlockingPool(
  int size, 
  ObjectFactory < T > factory, 
  Validator < T > validator)
 {
  return new BoundedPool < T >(size, validator, factory);
 }
}


因此,我们的客户端现在可以以更易读的方式创建对象池:

package com.test;
import java.sql.Connection;

import com.test.pool.Pool;
import com.test.pool.PoolFactory;


public class Main
{
 public static void main(String[] args)
    {
  Pool < Connection > pool = 
   PoolFactory.newBoundedBlockingPool(
    10, 
    new JDBCConnectionFactory("", "", "", ""), 
    new JDBCConnectionValidator());
  
  //do whatever you like
    }
}


我们的长篇大论到此结束。这个早就该做了。请随意使用、更改、添加更多实现。

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表