//A special variant of ThreadLocal that yields higher access performance when accessed from a FastThreadLocalThread. //Internally, a FastThreadLocal uses a constant index in an array, instead of using hash code and hash table, to look for a variable. //Although seemingly very subtle, it yields slight performance advantage over using a hash table, and it is useful when accessed frequently. //To take advantage of this thread-local variable, your thread must be a FastThreadLocalThread or its subtype. //By default, all threads created by DefaultThreadFactory are FastThreadLocalThread due to this reason. //Note that the fast path is only possible on threads that extend FastThreadLocalThread, //because it requires a special field to store the necessary state. //An access by any other kind of thread falls back to a regular ThreadLocal. //@param <V> the type of the thread-local variable public class FastThreadLocal<V> { //每个FastThreadLocal都有一个唯一的身份标识ID private final int index; //类初始化时调用,所以默认为variablesToRemoveIndex = 0 //第n个值存放在数组下标为n的位置,下标为0的位置存所有FastThreadLocal<V> private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex(); public FastThreadLocal() { index = InternalThreadLocalMap.nextVariableIndex(); } ... } //The internal data structure that stores the thread-local variables for Netty and all FastThreadLocals. //Note that this class is for internal use only and is subject to change at any time. //Use FastThreadLocal unless you know what you are doing. public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap { public static final Object UNSET = new Object(); public static int nextVariableIndex() { int index = nextIndex.getAndIncrement(); if (index < 0) { nextIndex.decrementAndGet(); throw new IllegalStateException("too many thread-local indexed variables"); } return index; } ... } //The internal data structure that stores the thread-local variables for Netty and all FastThreadLocals. //Note that this class is for internal use only and is subject to change at any time. //Use FastThreadLocal unless you know what you are doing. class UnpaddedInternalThreadLocalMap { static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>(); static final AtomicInteger nextIndex = new AtomicInteger(); //Used by FastThreadLocal Object[] indexedVariables; ... }
public class FastThreadLocalThread extends Thread { private InternalThreadLocalMap threadLocalMap; public FastThreadLocalThread() { } public FastThreadLocalThread(Runnable target) { super(target); } public FastThreadLocalThread(ThreadGroup group, Runnable target) { super(group, target); } public FastThreadLocalThread(String name) { super(name); } public FastThreadLocalThread(ThreadGroup group, String name) { super(group, name); } public FastThreadLocalThread(Runnable target, String name) { super(target, name); } public FastThreadLocalThread(ThreadGroup group, Runnable target, String name) { super(group, target, name); } public FastThreadLocalThread(ThreadGroup group, Runnable target, String name, long stackSize) { super(group, target, name, stackSize); } //Returns the internal data structure that keeps the thread-local variables bound to this thread. //Note that this method is for internal use only, and thus is subject to change at any time. public final InternalThreadLocalMap threadLocalMap() { return threadLocalMap; } //Sets the internal data structure that keeps the thread-local variables bound to this thread. //Note that this method is for internal use only, and thus is subject to change at any time. public final void setThreadLocalMap(InternalThreadLocalMap threadLocalMap) { //这个方法会在调用InternalThreadLocalMap.get()方法时被调用 //具体就是通过fastGet()方法设置FastThreadLocalThread一个新创建的InternalThreadLocalMap对象 this.threadLocalMap = threadLocalMap; } }
public class Thread implements Runnable { ... //ThreadLocal values pertaining to this thread. This map is maintained by the ThreadLocal class. ThreadLocal.ThreadLocalMap threadLocals = null; ... } public class ThreadLocal<T> { ... //Returns the value in the current thread's copy of this thread-local variable. //If the variable has no value for the current thread, //it is first initialized to the value returned by an invocation of the #initialValue method. //@return the current thread's value of this thread-local public T get() { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) { ThreadLocalMap.Entry e = map.getEntry(this); if (e != null) { @SuppressWarnings("unchecked") T result = (T)e.value; return result; } } return setInitialValue(); } //ThreadLocalMap is a customized hash map suitable only for maintaining thread local values. //No operations are exported outside of the ThreadLocal class. //The class is package private to allow declaration of fields in class Thread. //To help deal with very large and long-lived usages, the hash table entries use WeakReferences for keys. //However, since reference queues are not used, //stale entries are guaranteed to be removed only when the table starts running out of space. static class ThreadLocalMap { //The entries in this hash map extend WeakReference, using its main ref field as the key (which is always a ThreadLocal object). //Note that null keys (i.e. entry.get() == null) mean that the key is no longer referenced, so the entry can be expunged from table. //Such entries are referred to as "stale entries" in the code that follows. static class Entry extends WeakReference<ThreadLocal<?>> { //The value associated with this ThreadLocal. Object value; Entry(ThreadLocal<?> k, Object v) { super(k); value = v; } } //The initial capacity -- MUST be a power of two. private static final int INITIAL_CAPACITY = 16; //The table, resized as necessary. //table.length MUST always be a power of two. private Entry[] table; //The number of entries in the table. private int size = 0; //The next size value at which to resize. private int threshold; // Default to 0 ... } ... }
//A special variant of ThreadLocal that yields higher access performance when accessed from a FastThreadLocalThread. //Internally, a FastThreadLocal uses a constant index in an array, instead of using hash code and hash table, to look for a variable. //Although seemingly very subtle, it yields slight performance advantage over using a hash table, and it is useful when accessed frequently. //To take advantage of this thread-local variable, your thread must be a FastThreadLocalThread or its subtype. //By default, all threads created by DefaultThreadFactory are FastThreadLocalThread due to this reason. //Note that the fast path is only possible on threads that extend FastThreadLocalThread, //because it requires a special field to store the necessary state. //An access by any other kind of thread falls back to a regular ThreadLocal. //@param <V> the type of the thread-local variable public class FastThreadLocal<V> { //每个FastThreadLocal都有一个唯一的身份标识ID //每个FastThreadLocal对应的V值存储在当前FastThreadLocalThread线程维护的InternalThreadLocalMap的下标为index的位置 private final int index; //类初始化时调用,所以默认为variablesToRemoveIndex = 0 //第n个值存放在数组下标为n的位置,下标为0的位置会存储所有FastThreadLocal<V> private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex(); public FastThreadLocal() { //每new一个FastThreadLocal,index就会自增1,所以index是FastThreadLocal的唯一身份ID index = InternalThreadLocalMap.nextVariableIndex(); } //Returns the current value for the current thread @SuppressWarnings("unchecked") public final V get() { //首先获取由当前FastThreadLocalThread线程维护的InternalThreadLocalMap InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); //从数组中取出index位置的元素 Object v = threadLocalMap.indexedVariable(index); //如果获取到的元素不是一个UNSET即一个new Object(),则返回该元素 if (v != InternalThreadLocalMap.UNSET) { return (V) v; } //如果获取到的数组元素是缺省对象,则对threadLocalMap在index位置的元素值执行初始化操作 return initialize(threadLocalMap); } private V initialize(InternalThreadLocalMap threadLocalMap) { V v = null; try { //通过initialValue()方法对threadLocalMap在index位置的元素值进行初始化 //initialValue()方法可以被FastThreadLocal<V>的子类重写 v = initialValue(); } catch (Exception e) { PlatformDependent.throwException(e); } //设置threadLocalMap数组在下标index处的元素值 threadLocalMap.setIndexedVariable(index, v); addToVariablesToRemove(threadLocalMap, this); return v; } //Returns the initial value for this thread-local variable. protected V initialValue() throws Exception { return null; } private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) { //获取threadLocalMap数组下标为0的元素 Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex); Set<FastThreadLocal<?>> variablesToRemove; //将variable添加到数组下标为0位置的Set集合中,以便可以通过remove()方法统一删除 if (v == InternalThreadLocalMap.UNSET || v == null) { //创建FastThreadLocal类型的Set集合 variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>()); //将variablesToRemove这个Set集合设置到数组下标为0的位置 threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove); } else { //强转获得Set集合 variablesToRemove = (Set<FastThreadLocal<?>>) v; } variablesToRemove.add(variable); } ... }
//The internal data structure that stores the thread-local variables for Netty and all FastThreadLocals. //Note that this class is for internal use only and is subject to change at any time. //Use FastThreadLocal unless you know what you are doing. public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap { public static final Object UNSET = new Object(); private static final int INDEXED_VARIABLE_TABLE_INITIAL_SIZE = 32; ... private InternalThreadLocalMap() { //设置父类的成员变量indexedVariables的初始值 super(newIndexedVariableTable()); } private static Object[] newIndexedVariableTable() { //初始化一个32个元素的数组 Object[] array = new Object[INDEXED_VARIABLE_TABLE_INITIAL_SIZE]; //每个元素都是UNSET值 Arrays.fill(array, UNSET); return array; } //index是当前访问的FastThreadLocal在JVM里的索引 //indexedVariables数组是当前线程维护的InternalThreadLocalMap对象在初始化时创建的 public Object indexedVariable(int index) { Object[] lookup = indexedVariables; //直接通过索引来取出对象 return index < lookup.length? lookup[index] : UNSET; } public static InternalThreadLocalMap get() { Thread thread = Thread.currentThread(); if (thread instanceof FastThreadLocalThread) { return fastGet((FastThreadLocalThread) thread); } else { return slowGet(); } } private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) { InternalThreadLocalMap threadLocalMap = thread.threadLocalMap(); if (threadLocalMap == null) { thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap()); } return threadLocalMap; } private static InternalThreadLocalMap slowGet() { //如果普通线程使用FastThreadLocal其实和普通线程使用ThreadLocal是一样的 //因为此时返回的是一个通过ThreadLocal维护的InternalThreadLocalMap对象 ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap; InternalThreadLocalMap ret = slowThreadLocalMap.get(); if (ret == null) { ret = new InternalThreadLocalMap(); slowThreadLocalMap.set(ret); } return ret; } ... } class UnpaddedInternalThreadLocalMap { static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>(); static final AtomicInteger nextIndex = new AtomicInteger(); //Used by FastThreadLocal Object[] indexedVariables; ... UnpaddedInternalThreadLocalMap(Object[] indexedVariables) { this.indexedVariables = indexedVariables; } ... }
public class FastThreadLocal<V> { //每个FastThreadLocal都有一个唯一的身份标识ID //每个FastThreadLocal对应的V值存储在当前FastThreadLocalThread线程维护的InternalThreadLocalMap的下标为index的位置 private final int index; //类初始化时调用,所以默认为variablesToRemoveIndex = 0 //第n个值存放在数组下标为n的位置,下标为0的位置会存储所有FastThreadLocal<V> private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex(); ... //Set the value for the current thread. public final void set(V value) { if (value != InternalThreadLocalMap.UNSET) { InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); setKnownNotUnset(threadLocalMap, value); } else { remove(); } } private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) { //将当前FastThreadLocal对象对应的数据添加到当前线程维护的InternalThreadLocalMap中 if (threadLocalMap.setIndexedVariable(index, value)) { //将当前FastThreadLocal对象保存到待清理的Set中 addToVariablesToRemove(threadLocalMap, this); } } private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) { //获取threadLocalMap数组下标为0的元素 Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex); Set<FastThreadLocal<?>> variablesToRemove; //将variable添加到数组下标为0位置的Set集合中,以便可以通过remove()方法统一删除 if (v == InternalThreadLocalMap.UNSET || v == null) { //创建FastThreadLocal类型的Set集合 variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>()); //将variablesToRemove这个Set集合设置到数组下标为0的位置 threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove); } else { //强转获得Set集合 variablesToRemove = (Set<FastThreadLocal<?>>) v; } variablesToRemove.add(variable); } //Sets the value to uninitialized; //a proceeding call to get() will trigger a call to initialValue(). public final void remove() { remove(InternalThreadLocalMap.getIfSet()); } //Sets the value to uninitialized for the specified thread local map; //a proceeding call to get() will trigger a call to initialValue(). //The specified thread local map must be for the current thread. @SuppressWarnings("unchecked") public final void remove(InternalThreadLocalMap threadLocalMap) { if (threadLocalMap == null) { return; } //删除数组下标index位置对应的value Object v = threadLocalMap.removeIndexedVariable(index); //从数组下标0的位置取出Set集合,删除当前FastThreadLocal对象 removeFromVariablesToRemove(threadLocalMap, this); if (v != InternalThreadLocalMap.UNSET) { try { //和initValue()方法一样,可以被FastThreadLocal的子类重写 onRemoval((V) v); } catch (Exception e) { PlatformDependent.throwException(e); } } } //Returns the initial value for this thread-local variable. protected V initialValue() throws Exception { return null; } //Invoked when this thread local variable is removed by #remove(). //Be aware that #remove() is not guaranteed to be called when the `Thread` completes which means //you can not depend on this for cleanup of the resources in the case of `Thread` completion. protected void onRemoval(@SuppressWarnings("UnusedParameters") V value) throws Exception { } ... }
public abstract class Recycler<T> { ... private static final class Stack<T> { //所属的Recycler final Recycler<T> parent; //所属线程的弱引用 final WeakReference<Thread> threadRef; //异线程回收对象时,其他线程能保存的被回收对象的最大个数 final AtomicInteger availableSharedCapacity; //WeakOrderQueue最大个数 private final int maxDelayedQueues; //对象池的最大大小,默认最大为4K private final int maxCapacity; //存储缓存数据的数组 DefaultHandle<?>[] elements; //缓存的DefaultHandle对象个数 int size; //WeakOrderQueue链表的三个重要结点 private WeakOrderQueue cursor, prev; private volatile WeakOrderQueue head; Stack(Recycler<T> parent, Thread thread, int maxCapacity, int maxSharedCapacityFactor, int interval, int maxDelayedQueues) { this.parent = parent; threadRef = new WeakReference<Thread>(thread); this.maxCapacity = maxCapacity; availableSharedCapacity = new AtomicInteger(max(maxCapacity / maxSharedCapacityFactor, LINK_CAPACITY)); elements = new DefaultHandle[min(INITIAL_CAPACITY, maxCapacity)]; this.interval = interval; handleRecycleCount = interval;//Start at interval so the first one will be recycled. this.maxDelayedQueues = maxDelayedQueues; } ... } ... }
public abstract class Recycler<T> { private static final int LINK_CAPACITY; static { ... LINK_CAPACITY = safeFindNextPositivePowerOfTwo(max(SystemPropertyUtil.getInt("io.netty.recycler.linkCapacity", 16), 16)); ... } ... //a queue that makes only moderate guarantees about visibility: items are seen in the correct order, //but we aren't absolutely guaranteed to ever see anything at all, thereby keeping the queue cheap to maintain private static final class WeakOrderQueue extends WeakReference<Thread> { ... static final class Link extends AtomicInteger { final DefaultHandle<?>[] elements = new DefaultHandle[LINK_CAPACITY]; int readIndex; Link next; } private static final class Head { private final AtomicInteger availableSharedCapacity; Link link; ... } ... } ... }
public abstract class Recycler<T> { ... private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() { //在Recycler中调用threadLocal.get()时,便会触发调用这个initialValue()方法 @Override protected Stack<T> initialValue() { return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor, interval, maxDelayedQueuesPerThread); } @Override protected void onRemoval(Stack<T> value) { //Let us remove the WeakOrderQueue from the WeakHashMap directly if its safe to remove some overhead if (value.threadRef.get() == Thread.currentThread()) { if (DELAYED_RECYCLED.isSet()) { DELAYED_RECYCLED.get().remove(value); } } } }; private final int interval; private final int maxCapacityPerThread; private final int maxSharedCapacityFactor; private final int maxDelayedQueuesPerThread; protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor, int ratio, int maxDelayedQueuesPerThread) { //默认是8,用于控制对象的回收比率 interval = safeFindNextPositivePowerOfTwo(ratio); if (maxCapacityPerThread <= 0) { this.maxCapacityPerThread = 0; this.maxSharedCapacityFactor = 1; this.maxDelayedQueuesPerThread = 0; } else { //对象池的最大大小,能存多少元素,默认4K this.maxCapacityPerThread = maxCapacityPerThread; //默认是2 this.maxSharedCapacityFactor = max(1, maxSharedCapacityFactor); //默认2倍CPU核数 this.maxDelayedQueuesPerThread = max(0, maxDelayedQueuesPerThread); } } private static final class Stack<T> { ... //异线程回收对象时,其他线程能保存的被回收对象的最大个数 final AtomicInteger availableSharedCapacity; Stack(Recycler<T> parent, Thread thread, int maxCapacity, int maxSharedCapacityFactor, int interval, int maxDelayedQueues) { ... availableSharedCapacity = new AtomicInteger(max(maxCapacity / maxSharedCapacityFactor, LINK_CAPACITY)); ... } ... } ... }
public abstract class Recycler<T> { ... public final T get() { ... //获取当前线程缓存的Stack Stack<T> stack = threadLocal.get(); //从Stack中弹出一个DefaultHandle对象 DefaultHandle<T> handle = stack.pop(); if (handle == null) { //创建一个DefaultHandle handle = stack.newHandle(); //创建一个对象并保存到DefaultHandle handle.value = newObject(handle); } return (T) handle.value; } //由Recycler的子类来实现创建对象 protected abstract T newObject(Handle<T> handle); private static final class Stack<T> { ... DefaultHandle<T> pop() { int size = this.size; if (size == 0) { //尝试从其他线程回收的对象中转移一些到Stack的DefaultHandle数组中 if (!scavenge()) { return null; } size = this.size; if (size <= 0) { return null; } } size --; //将对象实例从DefaultHandle数组(elements)的栈顶弹出 DefaultHandle ret = elements[size]; elements[size] = null; //As we already set the element[size] to null we also need to store the updated size before we do any validation. //Otherwise we may see a null value when later try to pop again without a new element added before. this.size = size; if (ret.lastRecycledId != ret.recycleId) { throw new IllegalStateException("recycled multiple times"); } ret.recycleId = 0; ret.lastRecycledId = 0; return ret; } DefaultHandle<T> newHandle() { return new DefaultHandle<T>(this); } ... } ... }
public final class ReadTimeoutException extends TimeoutException { public static final ReadTimeoutException INSTANCE = PlatformDependent.javaVersion() >= 7 ? new ReadTimeoutException(true) : new ReadTimeoutException(); ReadTimeoutException() { } private ReadTimeoutException(boolean shared) { super(shared); } } @ChannelHandler.Sharable public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> { public static final MqttEncoder INSTANCE = new MqttEncoder(); private MqttEncoder() { } ... }
15.Netty设计模式之策略模式
(1)策略模式的特点
(2)策略模式的例子
(3)Netty中的策略模式
(1)策略模式的特点
一.封装一系列可相互替换的算法家族
二.动态选择某一个策略
(2)策略模式的例子
public class Strategy { private Cache cacheMemory = new CacheMemoryImpl(); private Cache cacheRedis = new CacheRedisImpl(); public interface Cache { boolean add(String key, Object object); } public class CacheMemoryImpl implements Cache { public boolean add(String key, Object object) { //保存到Memory return false; } } public class CacheRedisImpl implements Cache { public boolean add(String key, Object object) { //保存到Redis return false; } } public Cache getCache(String key) { if (key.length() < 10) { return cacheRedis; } else { return cacheMemory; } } }