@SpringBootApplication(scanBasePackages = {"io.seata"}) public class ServerApplication { public static void main(String[] args) throws IOException { //run the spring-boot application SpringApplication.run(ServerApplication.class, args); } } @Component public class ServerRunner implements CommandLineRunner, DisposableBean { private static final Logger LOGGER = LoggerFactory.getLogger(ServerRunner.class); private boolean started = Boolean.FALSE; private static final List<Disposable> DISPOSABLE_LIST = new CopyOnWriteArrayList<>(); public static void addDisposable(Disposable disposable) { DISPOSABLE_LIST.add(disposable); } @Override public void run(String... args) { try { long start = System.currentTimeMillis(); Server.start(args); started = true; long cost = System.currentTimeMillis() - start; LOGGER.info("seata server started in {} millSeconds", cost); } catch (Throwable e) { started = Boolean.FALSE; LOGGER.error("seata server start error: {} ", e.getMessage(), e); System.exit(-1); } } public boolean started() { return started; } @Override public void destroy() throws Exception { if (LOGGER.isDebugEnabled()) { LOGGER.debug("destoryAll starting"); } for (Disposable disposable : DISPOSABLE_LIST) { disposable.destroy(); } if (LOGGER.isDebugEnabled()) { LOGGER.debug("destoryAll finish"); } } } public class Server { //The entry point of application. public static void start(String[] args) { //create logger final Logger logger = LoggerFactory.getLogger(Server.class); //initialize the parameter parser //Note that the parameter parser should always be the first line to execute. //Because, here we need to parse the parameters needed for startup. ParameterParser parameterParser = new ParameterParser(args); //initialize the metrics //Seata Server是支持metric指标采集功能的 MetricsManager.get().init(); System.setProperty(ConfigurationKeys.STORE_MODE, parameterParser.getStoreMode()); //Seata Server里的Netty服务器的IO线程池,最小50个,最大500个 ThreadPoolExecutor workingThreads = new ThreadPoolExecutor( NettyServerConfig.getMinServerPoolSize(), NettyServerConfig.getMaxServerPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS, new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()), new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy() ); //创建一个Netty网络通信服务器 NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads); UUIDGenerator.init(parameterParser.getServerNode()); //log store mode : file, db, redis SessionHolder.init(parameterParser.getSessionStoreMode()); LockerManagerFactory.init(parameterParser.getLockStoreMode()); //启动定时调度线程 DefaultCoordinator coordinator = DefaultCoordinator.getInstance(nettyRemotingServer); coordinator.init(); nettyRemotingServer.setHandler(coordinator); //let ServerRunner do destroy instead ShutdownHook, see https://github.com/seata/seata/issues/4028 ServerRunner.addDisposable(coordinator); //127.0.0.1 and 0.0.0.0 are not valid here. if (NetUtil.isValidIp(parameterParser.getHost(), false)) { XID.setIpAddress(parameterParser.getHost()); } else { String preferredNetworks = ConfigurationFactory.getInstance().getConfig(REGISTRY_PREFERED_NETWORKS); if (StringUtils.isNotBlank(preferredNetworks)) { XID.setIpAddress(NetUtil.getLocalIp(preferredNetworks.split(REGEX_SPLIT_CHAR))); } else { XID.setIpAddress(NetUtil.getLocalIp()); } } //初始化Netty服务器 nettyRemotingServer.init(); } }
2.Seata Server的网络服务器启动的源码
创建和启动Seata的网络服务器:
public class NettyRemotingServer extends AbstractNettyRemotingServer { ... //Instantiates a new Rpc remoting server. 创建Seata Server public NettyRemotingServer(ThreadPoolExecutor messageExecutor) { super(messageExecutor, new NettyServerConfig()); } //启动Seata Server @Override public void init() { //registry processor registerProcessor(); if (initialized.compareAndSet(false, true)) { super.init(); } } private void registerProcessor() { //1.registry on request message processor ServerOnRequestProcessor onRequestProcessor = new ServerOnRequestProcessor(this, getHandler()); ShutdownHook.getInstance().addDisposable(onRequestProcessor); super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor); super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor); super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor); super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor); super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor); super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor); super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor); super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor); super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor); //2.registry on response message processor ServerOnResponseProcessor onResponseProcessor = new ServerOnResponseProcessor(getHandler(), getFutures()); super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, branchResultMessageExecutor); super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, branchResultMessageExecutor); //3.registry rm message processor RegRmProcessor regRmProcessor = new RegRmProcessor(this); super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor); //4.registry tm message processor RegTmProcessor regTmProcessor = new RegTmProcessor(this); super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null); //5.registry heartbeat message processor ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this); super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null); } ... } public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer { private final NettyServerBootstrap serverBootstrap; ... public AbstractNettyRemotingServer(ThreadPoolExecutor messageExecutor, NettyServerConfig nettyServerConfig) { super(messageExecutor); //创建Netty Server serverBootstrap = new NettyServerBootstrap(nettyServerConfig); serverBootstrap.setChannelHandlers(new ServerHandler()); } @Override public void init() { super.init(); //启动Netty Server serverBootstrap.start(); } ... } public abstract class AbstractNettyRemoting implements Disposable { //The Timer executor. 由单个线程进行调度的线程池 protected final ScheduledExecutorService timerExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("timeoutChecker", 1, true)); //The Message executor. protected final ThreadPoolExecutor messageExecutor; ... public void init() { //启动一个定时任务,每隔3秒检查发送的请求是否响应超时 timerExecutor.scheduleAtFixedRate(new Runnable() { @Override public void run() { for (Map.Entry<Integer, MessageFuture> entry : futures.entrySet()) { MessageFuture future = entry.getValue(); if (future.isTimeout()) { futures.remove(entry.getKey()); RpcMessage rpcMessage = future.getRequestMessage(); future.setResultMessage(new TimeoutException(String.format("msgId: %s ,msgType: %s ,msg: %s ,request timeout", rpcMessage.getId(), String.valueOf(rpcMessage.getMessageType()), rpcMessage.getBody().toString()))); if (LOGGER.isDebugEnabled()) { LOGGER.debug("timeout clear future: {}", entry.getValue().getRequestMessage().getBody()); } } } nowMills = System.currentTimeMillis(); } }, TIMEOUT_CHECK_INTERVAL, TIMEOUT_CHECK_INTERVAL, TimeUnit.MILLISECONDS); } ... } public class NettyServerBootstrap implements RemotingBootstrap { private final NettyServerConfig nettyServerConfig; private final EventLoopGroup eventLoopGroupBoss; private final EventLoopGroup eventLoopGroupWorker; private final ServerBootstrap serverBootstrap = new ServerBootstrap(); private ChannelHandler[] channelHandlers; private int listenPort; private final AtomicBoolean initialized = new AtomicBoolean(false); public NettyServerBootstrap(NettyServerConfig nettyServerConfig) { this.nettyServerConfig = nettyServerConfig; if (NettyServerConfig.enableEpoll()) { this.eventLoopGroupBoss = new EpollEventLoopGroup(nettyServerConfig.getBossThreadSize(), new NamedThreadFactory(nettyServerConfig.getBossThreadPrefix(), nettyServerConfig.getBossThreadSize())); this.eventLoopGroupWorker = new EpollEventLoopGroup(nettyServerConfig.getServerWorkerThreads(), new NamedThreadFactory(nettyServerConfig.getWorkerThreadPrefix(), nettyServerConfig.getServerWorkerThreads())); } else { this.eventLoopGroupBoss = new NioEventLoopGroup(nettyServerConfig.getBossThreadSize(), new NamedThreadFactory(nettyServerConfig.getBossThreadPrefix(), nettyServerConfig.getBossThreadSize())); this.eventLoopGroupWorker = new NioEventLoopGroup(nettyServerConfig.getServerWorkerThreads(), new NamedThreadFactory(nettyServerConfig.getWorkerThreadPrefix(), nettyServerConfig.getServerWorkerThreads())); } } //Sets channel handlers. protected void setChannelHandlers(final ChannelHandler... handlers) { if (handlers != null) { channelHandlers = handlers; } } @Override public void start() { this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupWorker) .channel(NettyServerConfig.SERVER_CHANNEL_CLAZZ) .option(ChannelOption.SO_BACKLOG, nettyServerConfig.getSoBackLogSize()) .option(ChannelOption.SO_REUSEADDR, true) .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSendBufSize()) .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketResvBufSize()) .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark())) .localAddress(new InetSocketAddress(getListenPort())) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { ch.pipeline().addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0)) .addLast(new ProtocolV1Decoder()) .addLast(new ProtocolV1Encoder()); if (channelHandlers != null) { addChannelPipelineLast(ch, channelHandlers); } } } ); try { this.serverBootstrap.bind(getListenPort()).sync(); XID.setPort(getListenPort()); LOGGER.info("Server started, service listen port: {}", getListenPort()); RegistryFactory.getInstance().register(new InetSocketAddress(XID.getIpAddress(), XID.getPort())); initialized.set(true); } catch (SocketException se) { throw new RuntimeException("Server start failed, the listen port: " + getListenPort(), se); } catch (Exception exx) { throw new RuntimeException("Server start failed", exx); } } ... } public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable { private RemotingServer remotingServer; private final DefaultCore core; private static volatile DefaultCoordinator instance; private final ScheduledThreadPoolExecutor retryRollbacking = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory(RETRY_ROLLBACKING, 1)); private final ScheduledThreadPoolExecutor retryCommitting = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory(RETRY_COMMITTING, 1)); private final ScheduledThreadPoolExecutor asyncCommitting = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory(ASYNC_COMMITTING, 1)); private final ScheduledThreadPoolExecutor timeoutCheck = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory(TX_TIMEOUT_CHECK, 1)); private final ScheduledThreadPoolExecutor undoLogDelete = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory(UNDOLOG_DELETE, 1)); ... public static DefaultCoordinator getInstance(RemotingServer remotingServer) { if (null == instance) { synchronized (DefaultCoordinator.class) { if (null == instance) { instance = new DefaultCoordinator(remotingServer); } } } return instance; } private DefaultCoordinator(RemotingServer remotingServer) { if (remotingServer == null) { throw new IllegalArgumentException("RemotingServer not allowed be null."); } this.remotingServer = remotingServer; this.core = new DefaultCore(remotingServer); } public void init() { retryRollbacking.scheduleAtFixedRate( () -> SessionHolder.distributedLockAndExecute(RETRY_ROLLBACKING, this::handleRetryRollbacking), 0, ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS); retryCommitting.scheduleAtFixedRate( () -> SessionHolder.distributedLockAndExecute(RETRY_COMMITTING, this::handleRetryCommitting), 0, COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS); asyncCommitting.scheduleAtFixedRate( () -> SessionHolder.distributedLockAndExecute(ASYNC_COMMITTING, this::handleAsyncCommitting), 0, ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS); timeoutCheck.scheduleAtFixedRate( () -> SessionHolder.distributedLockAndExecute(TX_TIMEOUT_CHECK, this::timeoutCheck), 0, TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS); undoLogDelete.scheduleAtFixedRate( () -> SessionHolder.distributedLockAndExecute(UNDOLOG_DELETE, this::undoLogDelete), UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS); } ... }
//Propagation level of global transactions. //全局事务的传播级别 public enum Propagation { //如果全局事务已经存在,此时会直接在当前的全局事务里继续去运行下去,后续运行的都是全局事务里的分支事务 //如果全局事务此时还不存在,就会开启一个新的全局事务来运行 //这种全局事务传播级别,就是REQUIRED //The logic is similar to the following code: // if (tx == null) { // try { // tx = beginNewTransaction(); // begin new transaction, is not existing // Object rs = business.execute(); // execute with new transaction // commitTransaction(tx); // return rs; // } catch (Exception ex) { // rollbackTransaction(tx); // throw ex; // } // } else { // return business.execute(); // execute with current transaction // } REQUIRED, //如果全局事务已经存在,则先暂停该事务,然后开启一个新的全局事务来执行业务 //The logic is similar to the following code: // try { // if (tx != null) { // suspendedResource = suspendTransaction(tx); // suspend current transaction // } // try { // tx = beginNewTransaction(); // begin new transaction // Object rs = business.execute(); // execute with new transaction // commitTransaction(tx); // return rs; // } catch (Exception ex) { // rollbackTransaction(tx); // throw ex; // } // } finally { // if (suspendedResource != null) { // resumeTransaction(suspendedResource); // resume transaction // } // } REQUIRES_NEW, //如果全局事务已经存在,则先暂停该事务,然后不要使用全局事务来执行业务 //The logic is similar to the following code: // try { // if (tx != null) { // suspendedResource = suspendTransaction(tx); // suspend current transaction // } // return business.execute(); // execute without transaction // } finally { // if (suspendedResource != null) { // resumeTransaction(suspendedResource); // resume transaction // } // } NOT_SUPPORTED, //如果全局事务不存在,则不要使用全局事务来执行业务 //如果全局事务存在,则使用全局事务来执行业务 //The logic is similar to the following code: // if (tx != null) { // return business.execute(); // execute with current transaction // } else { // return business.execute(); // execute without transaction // } SUPPORTS, //如果全局事务存在,则抛异常 //如果全局事务不存在,则执行业务 //The logic is similar to the following code: // if (tx != null) { // throw new TransactionException("existing transaction"); // } // return business.execute(); // execute without transaction NEVER, //如果全局事务不存在,则抛异常 //如果全局事务存在,则使用全局事务去执行业务 //The logic is similar to the following code: // if (tx == null) { // throw new TransactionException("not existing transaction"); // } // return business.execute(); // execute with current transaction MANDATORY }
8.全局事务执行模版根据传播级别来执行业务
//全局事务执行模版 public class TransactionalTemplate { ... //Execute object. //通过全局事务生命周期管理组件执行全局事务 public Object execute(TransactionalExecutor business) throws Throwable { //1.Get transactionInfo TransactionInfo txInfo = business.getTransactionInfo(); if (txInfo == null) { throw new ShouldNeverHappenException("transactionInfo does not exist"); } //1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'. //根据线程本地变量副本,获取当前线程本地变量副本里是否存在xid,如果存在则创建一个全局事务 //刚开始在开启一个全局事务的时候,是没有全局事务的 GlobalTransaction tx = GlobalTransactionContext.getCurrent(); //1.2 Handle the transaction propagation. //从全局事务配置里,可以获取到全局事务的传播级别,默认是REQUIRED //也就是如果存在一个全局事务,就直接执行业务; //如果不存在一个全局事务,就开启一个新的全局事务; Propagation propagation = txInfo.getPropagation(); //不同的全局事务传播级别,会采取不同的处理方式 //比如挂起当前事务 + 开启新的事务,或者是直接不使用事务执行业务,挂起其实就是解绑当前线程的xid //可以通过@GlobalTransactional注解,定制业务方法的全局事务,比如指定业务方法全局事务的传播级别 SuspendedResourcesHolder suspendedResourcesHolder = null; try { switch (propagation) { case NOT_SUPPORTED: //If transaction is existing, suspend it. if (existingTransaction(tx)) { suspendedResourcesHolder = tx.suspend(); } //Execute without transaction and return. return business.execute(); case REQUIRES_NEW: //If transaction is existing, suspend it, and then begin new transaction. if (existingTransaction(tx)) { suspendedResourcesHolder = tx.suspend(); tx = GlobalTransactionContext.createNew(); } //Continue and execute with new transaction break; case SUPPORTS: //If transaction is not existing, execute without transaction. if (notExistingTransaction(tx)) { return business.execute(); } //Continue and execute with new transaction break; case REQUIRED: //If current transaction is existing, execute with current transaction, else continue and execute with new transaction. break; case NEVER: //If transaction is existing, throw exception. if (existingTransaction(tx)) { throw new TransactionException(String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s", tx.getXid())); } else { //Execute without transaction and return. return business.execute(); } case MANDATORY: //If transaction is not existing, throw exception. if (notExistingTransaction(tx)) { throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'"); } //Continue and execute with current transaction. break; default: throw new TransactionException("Not Supported Propagation:" + propagation); } //1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'. if (tx == null) { //如果xid为null,则会创建一个新的全局事务 tx = GlobalTransactionContext.createNew(); } //set current tx config to holder GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo); try { //2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC, //else do nothing. Of course, the hooks will still be triggered. //开启一个全局事务 beginTransaction(txInfo, tx); Object rs; try { //Do Your Business //执行业务方法,把全局事务xid通过Dubbo RPC传递下去,开启并执行一个一个分支事务 rs = business.execute(); } catch (Throwable ex) { //3. The needed business exception to rollback. //发生异常时需要完成的事务 completeTransactionAfterThrowing(txInfo, tx, ex); throw ex; } //4. everything is fine, commit. //如果一切执行正常就会在这里提交全局事务 commitTransaction(tx); return rs; } finally { //5. clear //执行一些全局事务完成后的回调,比如清理等工作 resumeGlobalLockConfig(previousConfig); triggerAfterCompletion(); cleanUp(); } } finally { //If the transaction is suspended, resume it. if (suspendedResourcesHolder != null) { //如果之前挂起了一个全局事务,此时可以恢复这个全局事务 tx.resume(suspendedResourcesHolder); } } } ... }
@LoadLevel(name = ROUND_ROBIN_LOAD_BALANCE) public class RoundRobinLoadBalance implements LoadBalance { private final AtomicInteger sequence = new AtomicInteger(); @Override public <T> T select(List<T> invokers, String xid) { int length = invokers.size(); //通过轮询选择Seata Server的节点 return invokers.get(getPositiveSequence() % length); } private int getPositiveSequence() { for (;;) { int current = sequence.get(); int next = current >= Integer.MAX_VALUE ? 0 : current + 1; if (sequence.compareAndSet(current, next)) { return current; } } } }
二.随机选择算法
@LoadLevel(name = RANDOM_LOAD_BALANCE) public class RandomLoadBalance implements LoadBalance { @Override public <T> T select(List<T> invokers, String xid) { int length = invokers.size(); return invokers.get(ThreadLocalRandom.current().nextInt(length)); } }
三.最少使用算法
@LoadLevel(name = LEAST_ACTIVE_LOAD_BALANCE) public class LeastActiveLoadBalance implements LoadBalance { @Override public <T> T select(List<T> invokers, String xid) { int length = invokers.size(); long leastActive = -1; int leastCount = 0; int[] leastIndexes = new int[length]; for (int i = 0; i < length; i++) { long active = RpcStatus.getStatus(invokers.get(i).toString()).getActive(); if (leastActive == -1 || active < leastActive) { leastActive = active; leastCount = 1; leastIndexes[0] = i; } else if (active == leastActive) { leastIndexes[leastCount++] = i; } } if (leastCount == 1) { return invokers.get(leastIndexes[0]); } return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]); } }
四.一致性哈希算法
@LoadLevel(name = CONSISTENT_HASH_LOAD_BALANCE) public class ConsistentHashLoadBalance implements LoadBalance { public static final String LOAD_BALANCE_CONSISTENT_HASH_VISUAL_NODES = LOAD_BALANCE_PREFIX + "visualNodes"; private static final int VIRTUAL_NODES_NUM = ConfigurationFactory.getInstance().getInt(LOAD_BALANCE_CONSISTENT_HASH_VISUAL_NODES, VIRTUAL_NODES_DEFAULT); @Override public <T> T select(List<T> invokers, String xid) { //通过一致性哈希选择节点 return new ConsistentHashSelector<>(invokers, VIRTUAL_NODES_NUM).select(xid); } private static final class ConsistentHashSelector<T> { private final SortedMap<Long, T> virtualInvokers = new TreeMap<>(); private final HashFunction hashFunction = new MD5Hash(); ConsistentHashSelector(List<T> invokers, int virtualNodes) { for (T invoker : invokers) { for (int i = 0; i < virtualNodes; i++) { virtualInvokers.put(hashFunction.hash(invoker.toString() + i), invoker); } } } public T select(String objectKey) { SortedMap<Long, T> tailMap = virtualInvokers.tailMap(hashFunction.hash(objectKey)); Long nodeHashVal = tailMap.isEmpty() ? virtualInvokers.firstKey() : tailMap.firstKey(); return virtualInvokers.get(nodeHashVal); } } @SuppressWarnings("lgtm[java/weak-cryptographic-algorithm]") private static class MD5Hash implements HashFunction { MessageDigest instance; public MD5Hash() { try { instance = MessageDigest.getInstance("MD5"); } catch (NoSuchAlgorithmException e) { throw new IllegalStateException(e.getMessage(), e); } } @Override public long hash(String key) { instance.reset(); instance.update(key.getBytes()); byte[] digest = instance.digest(); long h = 0; for (int i = 0; i < 4; i++) { h <<= 8; h |= ((int) digest[i]) & 0xFF; } return h; } } public interface HashFunction { long hash(String key); } }