MongoDB手记(一)

写在前面:发现开始实习后,时间还是挺紧的,没有在学校那么闲了。所以这博客也好久没更新了。前几天老大让我研究下MongoDB,主要涉及性能以及维护方面的内容,于是就出现了题目中提到的“MongoDB手记”。今天就把我前几天零零碎碎整理的MongoDB心得贴上来,和大家分享下,顺带图省事,充当一篇博客吧(==!)。这个“MongoDB手记”还只写了一小部分,后续内容还在努力奋斗中。(本文中一些测试时间的截图被略去,因为我懒得传那么多图了==!)

1、Replica Sets

1.1、简介

异步的Master/Slave集群。

有一个主节点和N个从节点组成,外加可选的仲裁节点。

当主节点宕机时,整个系统会自动选举新的主节点,从而实现故障的自动恢复。

只有当某个节点获得票数大于总节点数的一半,才能成为主节点。

仲裁节点只负责投票,不能成为主节点。

所有的“写请求”(这里为广义的写,包括修改和删除等操作)都发到主节点中。

主节点会把请求记录在oplog(在MongoDB中存储为一个Collection)中。

从节点以一定的频率从主节点读取其oplog,并更新其自己的数据。

因此从节点在数据上会有一定的延迟性。

1.2、“写”入

所有的“写”请求都由主节点来处理。

主节点会把每次对于数据库的修改保存到oplog中。oplog在MongoDB中保存为一个“capped collection”。如下图:

 

1.3、从节点同步

从节点会以一定的频率不断的从主节点同步oplog到本地。然后把oplog中记录的操作应用在本地数据库中,从事使从库和主库数据同步。如下图:

 

1.4、主节点宕机

当主节点宕机时,整个集群会自动切换到从节点。经测试Java的MongoDB Driver可以实现主节点的自动切换,而不需要修改代码。

1.4.1、主节点宕机恢复时间测试

测试环境为4个节点。3个是数据节点,一个是仲裁节点。当客户端向MongoDB中写入数据过程中,kill掉主节点的mongod进程,观察主节点切换情况。

运行结果:

MongoDB日志:

 

测试结果:

通过两次测试,发现当主节点宕机后,客户端需要6~7秒钟的时间才能重新连接到新的主节点上。(时间偏长,记得当时那个人讲的是3秒左右,有可能是我配置的问题)

1.4.2、主节点宕机数据丢失

由MongoDb的主从节点同步机制可以看出,从节点的数据总是慢于主节点。如果当主节点宕机时,还有未同步到从节点的数据。那么这些数据在主节点从故障恢复后,会发生回滚。回滚的这些数据会“丢失”。

这里的“丢失”并不是真的找不到了。而是会存储到rollback目录下。用户可以在确保没有数据冲突的情况下,将这些数据手动导入数据空中。

1.5、读取

从节点的数据会慢于主节点。所以从从节点读取的数据可能是过期的。所以默认情况下,客户端只能从主节点读取数据。但是对于那些对时间不敏感的应用,可以通过设计“slaveOkay”来从从节点读取数据。从而分散数据库的读压力。

1.5.1、单节点读性能测试:

一台客户机10个线程从主节点随机读取5000000次数据(总数据大小也是5000000)的时间:

约合4万/秒

1.5.2、多节点读性能测试:

2台客户机60个线程从任意节点共随机读取10000000次数据的时间:

约合11万/秒。

数据总量比较小,数据和索引能够全部保存在内存中。

2、Durability

2.1、简介

MongoDB使用mmap把磁盘文件映射到内存,并定期把内存中的数据flush到磁盘中。所以如果MongoDb所在的服务器因为某些原因突然宕机,就可能造成在内存中还没有flush到磁盘中的数据丢失。

2.2、普通模式

即上述的模式。对于数据的持久性并没有保证。如果突然宕机,可能造成数据的丢失。但是写性能比较好。

2.2.1、单线程写入测试:

单线程写入1000000个数据

约合5万/秒

2.2.2、多线程写入测试:

10个线程写入1000000数据

发现多线程性能反而变慢。这是由于mongodb的锁是库级别的。多线程竞争,反而会降低性能。

2.3、Journaling

从某版本开始,MongoDB提供了该特性。用户可以在启动mongod进程时指定使用该参数。

使用该参数后,会在每次写请求时,向日志文件中追加一条记录。这样即使发生宕机,也可以通过日志文件恢复数据。

并且为了防止写日志文件成为瓶颈,采用了group commit。即在一段时间内(目前是100ms)的日志记录会一次性统一写入。

2.3.1、写入性能测试

20个线程,共写入1000000的数据

可以看出耗时48秒左右。

约合2万/秒

3、写请求何时返回

默认情况下对MongoDB的写操作并没有响应结果。即用户发起写请求,但是用户并不知道到底有没有写入成功。

用户可以调用getLastError命令来获取上次写请求的情况。

该命令有若干可选的参数。

W: 如果该值为N,则表示只有当用户的写请求被同步到N太服务器上时,才返回。

该参数并不能保证数据已经永久写入服务器中。因为该参数只能保证数据被同步到N台服务器上,但是并没有确保数据已经写入那些服务器的磁盘中。所以如果服务器发生宕机,还是有可能造成数据丢失。

Fysnc: 该参数的行为取决于是否使用了—journal。如果为使用—journal,则参数的作用是强制数据库把内存中的数据flush到磁盘中,然后才返回成功。

如果使用了—journal,则该参数会等待下一次的group commit发生,确保数据已经被写入日志,然后才返回。

3.1、JAVA中的对应

这几个参数在JAVA中对应为“WriteConcern”,如下图:

3.2、性能测试

提高了数据的安全性后,会对数据写入的性能造成影响。

3.2.1、SAFE写入

即确保数据同步到数据库。

30个线程,开启—journal

可以看出需要1分钟左右。约合1.7万/秒。

3.2.2、FYSNC写入

即确保数据刷新或者刷日志

3.2.2.1、开启—journal

1客户机,30个线程,1万个数据。

插入时间要100秒。约合100次/

这是由于开启了—journal和FSYNC后,每次写请求要等一次group commit结束才返回。而一次group commit的时间间隔为100ms。所以这样导致一次写请求发起,到返回,需要耗费100ms左右的时间,所以客户端的写入时间就变长。但是服务端并没有达到饱和。

接下来我们使用两个客户机,每个客户机30个线程。写入20000个数据。结果我们发现耗费的时间仍是100多秒。约合200次/

所以可以看出这时服务器并没有达到饱和。

当继续增加线程数的时候,客户端抛出了如下异常:

可以看出,由于要获得的连接数过多,导致连接获得失败。

查看JAVA的Driver的代码看出,是由于默认设计的参数过小。

在增大参数后,重新测试:

3个客户机,每个500个线程。共写入150000个数据:

约合3000次/

3.2.2.2、不开启—journal

待续

4.Sharding

努力奋斗中

“HBase性能调优”之代码说话

前一段时间在“淘宝JAVA中间件团队”的博客上看到了一片关于“HBase性能调优”的文章(在这里)。里面对于HBase的一些参数的设置给出了建议和原因。这篇博客打算从代码的角度来看一下,为什么那些参数的调整会对系统的性能产生影响。

1、hbase.regionserver.handler.count

该参数与HBaseRPC请求有关,是处理RPC请求的线程的数量。我之前写过一篇关于HadoopRPC代码分析的博客(在这里),而HBase的RPC实现与Hadoop的大同小异,这里就略去了,大家可以看一下我之前那篇文章来详细了解下。

2、hbase.hregion.memstore.flush.size

这个地方我和淘宝那个博客有不同的理解。

淘宝的原文是这样的:

hbase.hregion.memstore.flush.size 这个参数的作用是 当单个memstore达到指定值时,flush该memstore。

淘宝说这个值是“单个MemStore的阈值,但是结合源代码,我认为这个应该是一个HResion中“所有MemStore的值的“”的阈值。代码分析如下: 首先看一下HRegion的构造函数,这里从配置文件中读取该参数的值,如果没设定,则按默认值64M来算。

Java语言: 临时自用代码
1 long flushSize = regionInfo.getTableDesc().getMemStoreFlushSize();
2 if (flushSize == HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE) {
3     flushSize = conf.getLong(“hbase.hregion.memstore.flush.size”,
4             HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
5 }
6 this.memstoreFlushSize = flushSize;

然后我们以PUT过程为例(本博客中的代码为了方便说明问题,都进行了省略,只列出关键语句),其中的memstoreSize变量为AtomicLong类型的类成员变量。是整个HRegionMemStore的总大小的值。

Java语言: 临时自用代码
01 private void put(final Map<byte[], List<KeyValue>> familyMap,
02         boolean writeToWAL) throws IOException {
03     boolean flush = false;
04     // 把值插入到相应的MemStore中,并返回所有MemStore增加的大小
05     long addedSize = applyFamilyMapToMemstore(familyMap);
06     // 修改该HRegion的MemStore的总大小,并比较是否超过阈值
07     flush = isFlushSize(memstoreSize.addAndGet(addedSize));
08     // 如果超过则请求flush
09     if (flush) {
10         requestFlush();
11     }
12 }
Java语言: 临时自用代码
1 private boolean isFlushSize(final long size) {
2     return size > this.memstoreFlushSize;
3 }

以上代码都是HRegion类中的,所以可以看出,“hbase.hregion.memstore.flush.size”这个参数应该是整个HRegion中所有HStoreMemStore的大小总和的阈值。

3、hbase.hregion.memstore.block.multiplier

这个参数与上一个参数息息相关。当一个HRegion中的MemStore的总大小超过阈值后,会出发flush请求。但是这个仅是个请求,并不能保证flush一定执行(如storefile过多等原因,后文将提到)。所以为了防止MemStore的值过大,就引入了这个参数。该参数是个倍数,表示一个HRegionMemStore的总大小最大可以是“hbase.hregion.memstore.flush.size”的几倍。如果超过这个值,则会阻塞该HRegion写请求,等待flush。代码如下: 先看一下HRegion中对该参数的初始化:

Java语言: 临时自用代码
1 this.blockingMemStoreSize = this.memstoreFlushSize
2         * conf.getLong(“hbase.hregion.memstore.block.multiplier”, 2);

然后我们来看一下HRegionput函数:

Java语言: 临时自用代码
1 public void put(Put put, Integer lockid, boolean writeToWAL)
2         throws IOException {
3     // 进行必要的检查   
4     checkResources();
5     // 调用实际的put函数
6     put(put.getFamilyMap(), writeToWAL); 7 }

可以看出,在执行实际的put前,会进行必要的检查。那我们就来看一下checkResources函数的实现:

Java语言: 临时自用代码
01 private void checkResources() {
02     while (this.memstoreSize.get() > this.blockingMemStoreSize) {
03         requestFlush();
04         synchronized (this) {
05             try {
06                 wait(threadWakeFrequency);
07             } catch (InterruptedException e) {
08                 // continue;
09             }
10         }
11     }
12 }

可以看出,如果该HRegionMemStore的总大小超过了blockingMemStoreSize,则会请求flush操作,然后该线程会进入阻塞状态,知道memstoreSize的大小降到合适的范围。

4、hbase.regionserver.global.memstore.upperLimit/lowerLimit

这两个参数和上面的参数关注的是不同的方向。这两个参数关注的是该RegionServer上所有的MemStore的总大小占整个内存大小的比例。代码如下: 首先是参数的初始化代码:

Java语言: 临时自用代码
01 // 构造函数
02 //
03 // 获得内存最大值
04 long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
05 // 获得上限
06 this.globalMemStoreLimit = globalMemStoreLimit(max, DEFAULT_UPPER,UPPER_KEY, conf);
07 // 获得下限
08 long lower = globalMemStoreLimit(max, DEFAULT_LOWER, LOWER_KEY, conf);
09 if (lower > this.globalMemStoreLimit) {
10     lower = this.globalMemStoreLimit;
11 }
12 this.globalMemStoreLimitLowMark = lower;
13 //
14 // 构造函数结束
15
16
17 static long globalMemStoreLimit(final long max,
18         final float defaultLimit, final String key, final Configuration c) {
19     float limit = c.getFloat(key, defaultLimit);
20     return getMemStoreLimit(max, limit, defaultLimit);
21 }
22
23 static long getMemStoreLimit(final long max, final float limit,
24         final float defaultLimit) {
25     return (long) (max * limit);
26 }

之后我们来看一下HRegionServer中的put方法:

Java语言: 临时自用代码
01 @Override
02 public void put(final byte[] regionName, final Put put) throws IOException {
03     // 获得相应的HRegion对象
04     HRegion region = getRegion(regionName);
05     try {
06         if (!region.getRegionInfo().isMetaTable()) {
07             // 检查MemStore的大小
08             this.cacheFlusher.reclaimMemStoreMemory();
09         }
10         // 是否写入HLog
11         boolean writeToWAL = put.getWriteToWAL();
12         // 执行具体的put
13         region.put(put, getLockFromId(put.getLockId()), writeToWAL);
14     } catch (Throwable t) {
15         throw convertThrowableToIOE(cleanup(t));
16     }
17 }

可以看出,当HRegionServer收到用户的put请求后,会首先让cacheFlusher检查MemStore的大小情况,然后才调用具体HRegionput方法。那下面我们就来看一下reclaimMemStoreMemory这个函数:

Java语言: 临时自用代码
01 public synchronized void reclaimMemStoreMemory() {
02     // 超过上限
03     if (isAboveHighWaterMark()) {
04         lock.lock();
05         try {
06             // 等待,直到低于上限
07             while (isAboveHighWaterMark() && !server.isStopped()) {
08                 wakeupFlushThread();
09                 try {
10                     // 阻塞等待flush完成
11                     flushOccurred.await(5, TimeUnit.SECONDS);
12                 } catch (InterruptedException ie) {
13                     Thread.currentThread().interrupt();
14                 }
15             }
16         } finally {
17             lock.unlock();
18         }
19     } else if (isAboveLowWaterMark()) {
20         // 不超过上限,但是超过下限,则请求一次flush,然后返回
21         wakeupFlushThread();
22     }
23 }
24
25 private boolean isAboveHighWaterMark() {
26     return server.getGlobalMemStoreSize() >= globalMemStoreLimit;
27 }
28
29
30 private boolean isAboveLowWaterMark() {
31     return server.getGlobalMemStoreSize() >= globalMemStoreLimitLowMark;
32 }

可以看出,如果总大小超过上限,则会请求一次flush,然后再次检测,知道低于上限为止,而仅仅超过下限的话,则只请求一次flush,然后就返回。

5、hbase.hstore.blockingStoreFiles

这点又是我和淘宝那个博客理解不一样的地方。

淘宝原文如是说:

在compaction时,如果一个Store(Coulmn Family)内有超过7个storefile需要合并,则block所有的写请求,进行flush,限制storefile数量增长过快。

我认为淘宝这点说的很有问题,首先我们来看一下官方文档里对这个参数是怎么解释的:

If more than this number of StoreFiles in any one Store (one StoreFile is written per flush of MemStore) then updates are blocked for this HRegion until a compaction is completed, or until hbase.hstore.blockingWaitTime has been exceeded.

可以看出,文档中的意思是,当一个HRegion中的任何一个HStore的文件数大于该阈值时,该HRegionupdate会被阻塞,直到过多的store文件在compact中被合并或者超时。

下面我们就来结合代码来看一下这个参数到底有什么作用。

上文我们一直提到“请求flush”,但是一直没有了解细节,下面我们就来看一下到底是如何flush的。

我们的flush分为两种,一种是有目的的flush,即指定flush某个HRegion中的MemStore,例如前面讲“ hbase.hregion.memstore.flush.size ”参数时提到的情况。还有一种是没有目的的flush。该flush会从所有的HRegion中找出一个最适合的HRegionflush。如前面讲“hbase.regionserver.global.memstore.upperLimit ”提到的情况。

HBaseflush工作是由一个独立的线程完成的。该线程不停的从一个Flush请求队列中获得请求,然后进行Flush

下面我们来看一下该线程的run函数:

Java语言: 临时自用代码
01 @Override
02 public void run() {
03     while (!this.server.isStopped()) {
04         FlushQueueEntry fqe = null;
05         try {
06             // 获得Flush请求
07             fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
08             // 如果为空或者为普通flush请求
09             if (fqe == null || fqe instanceof WakeupFlushThread) {
10                 // 如果超过下限
11                 if (isAboveLowWaterMark()) {
12                     // 选出一个最适合的HRegion,并进行flush
13                     flushOneForGlobalPressure();
14                 }
15                 continue;
16             }
17             // 如果为特定HRegion的flush请求
18             FlushRegionEntry fre = (FlushRegionEntry) fqe;
19             if (!flushRegion(fre)) {
20                 break;
21             }
22         } catch (Exception ex) {
23             continue;
24         }
25     }
26 }

下面我们来看一下指定要FlushHRegion的这种情况:

Java语言: 临时自用代码
01 private boolean flushRegion(final FlushRegionEntry fqe) {
02     // 获得要Flush的HRegion
03     HRegion region = fqe.region;
04     // 如果该HRegion有过多的storefile
05     if (!fqe.region.getRegionInfo().isMetaRegion()
06             && isTooManyStoreFiles(region)) {
07         // 请求compact
08         this.server.compactSplitThread.requestCompaction(region, getName());
09         // 把Flush请求重新放回队列,延迟处理
10         this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100));
11         // Tell a lie, it’s not flushed but it’s ok
12         return true;
13     }
14     // 如果满足flush的要求,则进行flush
15     return flushRegion(region, false);
16 }
Java语言: 临时自用代码
01 this.blockingStoreFilesNumber =
02     conf.getInt(“hbase.hstore.blockingStoreFiles”, 7);
03
04 private boolean isTooManyStoreFiles(HRegion region) {
05     for (Store hstore : region.stores.values()) {
06         if (hstore.getStorefilesCount() > this.blockingStoreFilesNumber) {
07             return true;
08         }
09     }
10     return false;
11 }

可以看出如果被请求FlushHRegion中的任意一个HStore中的storefile的数量大于了阈值,则会把该请求重新放入队列中,延迟处理。并返回true,假装flush成功。可以看出,这个过程并没有发生阻塞,并不向文档说的那样。个人认为实际情况应该向淘宝一位同学博客中说的那样(在这里):

Memstore在flush region时会先检查store中store file是不是太多了,太多了则会先进行compact,并将flush推迟,而flush推迟的过程中,会导致memstore中写了很多的数据,在memstore的大小大于了其两倍的空间(默认情况下也就是128m)时,会阻塞住此时的写/删除请求,等到memstore大小降下去后才会恢复。

结语:上述只是我个人对代码的理解,难免有偏颇之处,欢迎大家指出。

Java内部类访问局部变量时的final问题

JAVA用了也快三年了,内部类访问局部变量的情况也没少遇到。也一直知道要给变量加个final修饰符,不然通过不了编译。但一直也没深究过为什么要加。昨天好奇的上网查了下,并翻阅了下相关的书籍(Core Java 8th),终于算是搞明白了,在这里简单说明下。

说先我们来看一段示例代码:

Java语言: 临时自用代码
01 public void start(int interval, final boolean beep) {
02
03     // Inner Class
04     class TimePrinter implements ActionListener {
05
06         @Override
07         public void actionPerformed(ActionEvent event) {
08             Date now = new Date();
09             System.out.println(“At the tone, the time is “ + now);
10             if (beep) {
11                 Toolkit.getDefaultToolkit().beep();
12             }
13         }
14     }
15     //
16     ActionListener listener = new TimePrinter();
17     Timer t = new Timer(interval, listener);
18     t.start();
19 }

我们在start函数中定义了一个内部类TimePrinter,其中访问了函数的局部变量beep

为了说明内部类访问局部变量为什么要加final关键字,我们先来看一下JAVA对内部类的实现。

假设上述代码中start函数所在的类的名称为TalkingClock。则编译上述代码的时候,JAVA编译器会把TimePrinter内部类编译为一个独立的class文件。其形式如下(类名为:外部类$内部类):

Java语言: 临时自用代码
01 class TalkingClock$1TimePrinter {
02     // 添加的构造函数,参数为外部类对象的引用和该内部类访问的局部变量的引用(这里为boolean类型)
03     TalkingClock$1TimePrinter(TalkingClock, boolean);
04     // 内部类原有的函数
05     public void actionPerformed(java.awt.event.ActionEvent);
06     // 局部变量的引用
07     final boolean val$beep;
08     // 外部类对象的引用
09     final TalkingClock this$0;
10 }

通过上述类的定义,我们可以看出内部类在构造的时候,会被编译器自动传入外部类对象的一个引用,同时也会传入内部类访问的局部变量的引用,这也就解释了内部类对象为什么可以访问外部类的成员变量和函数还有局部变量了。但是由于这些工作是在编译是进行的,JAVA虚拟机并没有什么所谓的内部类的概念,在JAVA虚拟机看来,该内部类和外部类是两个独立的class文件。我们知道,一个类的私有函数和成员变量是不能被其他类访问的。那么内部类又是如何访问外部类的私有成员变量和函数呢?

我们假设外部类中有一个私有的int型的变量counter。我们想在内部类中访问它。其实在编译的时候,为了内部类可以访问外部类的私有变量,JAVA编译器还偷偷做了一些额外的工作。编译器除了上文提到的会生成一个内部类类,同时还会修改我们的外部类代码。其修改如下:

Java语言: 临时自用代码
1 class TalkingClock {
2     // 编译器自动添加的函数,用来访问私有成员变量counter
3     static int access$0(TalkingClock);
4     // 原有的函数
5     public void start();
6     // 私有成员变量
7     private int counter
8 }

可以看出,为了访问counter私有成员变量,编译器偷偷的为我们添加了一个access$0的静态函数,它接收一个TalkingClock对象的引用,并返回该对象内的coutner的值。并且编译器会把我们在内部类中用到counter的地方都替换为TalkingClock.access$0(this$0)

好了,现在我们已经了解了内部类的实现机制,那我们最后来看一下访问局部变量为什么要求局部变量添加final关键字。

还是以我们最开始的那个start函数为例。我们来看一下该函数的可能的执行过程:

如果beep变量不被标注为final,那么就意味着我们可以随时修改beep的值。假设我们在创建了TimePrinter对象后修改了beep的值,那么这时我们的内部类所看到的beep的值还是之前通过构造函数传递进去的老值,这样就导致内部类和外部函数对beep值“认识”的不一致。所以final关键字的目的就是为了保证内部类和外部函数对变量“认识”的一致性。

结束语:这个内部类final的问题从最开始学JAVA时就遇到了,期间也有想过为什么要加,但最终都没有深究,就理所当然的认为是规定了。其实这是一个很不好的习惯。学习东西就要“知其然,知其所以然”,一知半解最是害人。希望大家都可以引以为鉴。

博客代码高亮

在网上发现的代码高亮工具,输入目标代码,自动生成高亮后的HTML代码。

地址: http://fayaa.com/code/home/

测试效果如下:

Java语言: 临时自用代码
01 package cn.edu.nju.software.lidejia.hellojava;
02
03 /**
04 *
05 * @author lidejia
06 */
07 public class Main {
08
09     private static final int NUM = 10;
10
11     /**
12      * @param args the command line arguments
13      */
14     public static void main(String[] args) {
15         for(int i = 0; i < NUM; i++) {
16             System.out.println(“Hello World”);
17         }
18     }
19 }

Java NIO 网络编程之Apache MINA

前一段在网上看到了“淘宝伯岩”(boyan@taobao.com)的一份关于Java NIO 网络编程的讲义《NIO trick and trap——编写高性能Java NIO网络框架》。其中里面提到了Java NIO在网络编程方面的应用和编程模型,同时也提到了Apache的一个开源网络框架MINA。 正好自己对于NIO对网络编程的应用也不是太熟悉,于是就简单了解了下MINA。本文并不是针对于MINA这个框架如何使用(相关内容可以参见相关文档),而是整个MINA框架对于NIO的应用。

1、整体架构

首先我们来看一下MINA Server端的整体架构(图片源自MINA官方文档)。 其中的IoService用来负责实际的网络连接,监听用户请求等功能。 对于每一个新的连接,MINA都会创建一个与之对应的Session。 MINA收到或者要发送的数据会经过一系列的Filter。过滤器可以用来对消息进行过滤和规范化。 最后MINA会调用用户实现的一个IoHandler,来处理经过过滤器过滤的消息。 可以看出,对于框架的使用者来说,网络的具体编程是对其透明的,用户只需要实现相应的Handler,并且设置好Filter即可。

2、IoService细节

2.1、继承关系

首先我们来看一下核心几个类的继承关系。

其中IoService为整个框架的核心接口,其代表了网络服务。

IoAcceptorIoService一个子接口,用来代表对用户请求的接受和监听。

SocketAcceptorIoAcceptor的一个子接口,用来代表基于Socket的网络请求。

AbstractIoServiceAbstractIoAcceptor分别是对IoServiceIoAcceptor的一个默认抽象实现。

AbstractPoolingIoAcceptor则是引入了池的概念,建立了处理线程池等。

最后NIOSocketAcceptorAbstractPoolingIoAcceptor的NIO的一个实现。

2.2、整体流程

  • 创建一个NIOSocketAcceptor对象。
  • 调用IoAcceptorbind方法,创建SocketChannel,并绑定到之前的Selector上。
  • 创建一个新的线程,用来监听用户的连接。如果收到用户的连接请求,则为其创建一个Session,并把Session加入到一个Processor中等待处理。
  • 系统中有若干个Processor,每个有自己的线程。在该线程中,也存在一个Selector,用来监听所有该Processor上的Session。如果某个Session有数据可以读取或者写入,则将数据传递给一系列的Filter,并最终调用相应的Handler进行处理。

2.3、具体代码分析

为了简明起见,下面的代码只是整个代码的截取,目的是为了更好的说明问题。完整的代码请详见项目源文件。

2.3.1、构造

我们首先来看一下NioSocketAcceptor的构造函数。

  1. public NioSocketAcceptor() {

  2.     super(new DefaultSocketSessionConfig(), NioProcessor.class);

  3.     ((DefaultSocketSessionConfig) getSessionConfig()).init(this);

  4. }

该构造函数传给了父类构造函数两个参数。首先是一个DefaultSocketSessionConfig的实例,用来表示一个配置对象。之后是NioProcessor.class,用来指明处理对象的类型。

接下来我们来看一下 NioSocketAcceptor的父类 AbstractPollingIoAcceptor<NioSession, ServerSocketChannel>

  1. protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<NioSession>> processorClass) {

  2.     // 根据之前子类传递过来的processorClass,新建一个SimpleIoProcessorPool对象

  3.     // 并调用另一个构造函数

  4.     this(sessionConfig, null, new SimpleIoProcessorPool<NioSession>(processorClass), true);

  5. }

  6. private AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<NioSession> processor, boolean createdProcessor) {

  7.     // 调用父类构造函数

  8.     super(sessionConfig, executor);

  9.     // 调用初始化函数

  10.     init();

  11. }

其中的init函数的实现在 NioSocketAcceptor中,代码如下:

  1. @Override

  2. protected void init() throws Exception {

  3.     // 创建一个Selector对象

  4.     selector = Selector.open();

  5. }

其中的SimpleIoProcessorPool<NioSession>的构造函数如下:

  1. public SimpleIoProcessorPool(Class<? extends IoProcessor<NioSession>> processorType) {

  2.     // 用默认参数调用另一构造函数

  3.     this(processorType, null, DEFAULT_SIZE);

  4. }

  5. public SimpleIoProcessorPool(Class<? extends IoProcessor<NioSession>> processorType, Executor executor, int size) {

  6.     // 若executor为null则新建,不然则使用传入的对象。

  7.     createdExecutor = (executor == null);

  8.     if (createdExecutor) {

  9.         this.executor = Executors.newCachedThreadPool();

  10.     } else {

  11.         this.executor = executor;

  12.     }

  13.     // 新建一个IoProcessor池

  14.     pool = new IoProcessor[size];

  15.     // 根据传入的processorType,利用Java的反射机制来创建对象,填填满对象池

  16.     Constructor<? extends IoProcessor<S>> processorConstructor  = processorType.getConstructor(ExecutorService.class);

  17.     for(int i = 0; i < size; i++) {

  18.         pool[i] = processorConstructor.newInstance(this.executor);

  19.     }

  20. }

从以上代码可以看出, SimpleIoProcessorPool<NioSession>根据传入的IoProcessor的类型(这里为NioProcessor),创建了一个IoProcessor池,和一个Executor线程池,用来进行网络IO的读写请求。

那我们就来看一下 NioProcessor的具体创建过程

  1. public NioProcessor(Executor executor) {

  2.     super(executor);

  3.     this.selector = Selector.open();

  4. }

可以看出,在NioProcessor创建的过程中,还创建了一个Selector对象,用来之后监听网络IO的读写请求。并把传进来的Executor对象传递给父类进行保存。

之后我们再来看一下  AbstractPollingIoAcceptor<NioSession, ServerSocketChannel>的父类 AbstractIoAcceptor

  1. protected AbstractIoAcceptor(IoSessionConfig sessionConfig, Executor executor) {

  2.     super(sessionConfig, executor);

  3. }

可以看出该构造函数没有进行额外的工作,而是直接调用父类。那我们就来看一下他的父类 AbstractIoService

  1. protected AbstractIoService(IoSessionConfig sessionConfig, Executor executor) {

  2.     // 保存配置对象

  3.     this.sessionConfig = sessionConfig;

  4.     // 若为提供Executor对象,则新建一个。

  5.     if (executor == null) {

  6.         this.executor = Executors.newCachedThreadPool();

  7.         createdExecutor = true;

  8.     } else {

  9.     this.executor = executor;

  10.     createdExecutor = false;

  11.     }

  12. }

至此, NIOSocketAcceptor对象的创建也就到此结束。在整个创建过程中,我们创建了一个 DefaultSocketSessionConfig实例,用来表示一些配置选项。创建了一个Selector对象, 和一个Executor线程池,用来负责监听用户的请求(详见后文)。创建了一个  SimpleIoProcessorPool<NioSession>对象,其中包括一组 NioProcessor对象和一个Executor线程池对象,每个NioProcessor对象中还包括一个Selector对象,用来负责网络IO的读写处理(详见后文)。

2.3.2、bind

我们接着来看一下IoAcceptor中声明的bind方法。其现实在AbstractIoAcceptor中。代码如下:

  1. public final void bind(Iterable<? extends SocketAddress> localAddresses) throws IOException {

  2.     // 检查地址的类型,并加入到localAddressCopy中

  3.     List<SocketAddress> localAddressesCopy = new ArrayList<SocketAddress>();

  4.     for (SocketAddress a : localAddresses) {

  5.         checkAddressType(a);

  6.         localAddressesCopy.add(a);

  7.     }

  8.     // 调用bindInternal函数,返回成功绑定的地址。

  9.     Set<SocketAddress> addresses = bindInternal(localAddressesCopy);

  10.     synchronized (boundAddresses) {

  11.             boundAddresses.addAll(addresses);

  12.     }

  13. }

下面我们来看一下bindInternal的实现,该函数的实现在AbstractPollingIoAcceptor<NioSession, SocketChannel>中。

  1. @Override

  2. protected final Set<SocketAddress> bindInternal(List<? extends SocketAddress> localAddresses) throws Exception {

  3.     // 创建一个AcceptorOperationFuture对象,该对象的执行结果可以异步的通知

  4.     AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);

  5.     // 将上文创建的对象加入队列中,等待其他线程处理

  6.     registerQueue.add(request);

  7.     // 启动Acceptor,进行地址的绑定

  8.      startupAcceptor();

  9.     // 唤起之前阻塞的Selector对象(详见下文)

  10.     wakeup();

  11.     // 等待绑定完成

  12.     request.awaitUninterruptibly();

  13.     // 返回结果

  14.     Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();

  15.     for (H handle : boundHandles.values()) {

  16.         newLocalAddresses.add(localAddress(handle));

  17.     }

  18.     return newLocalAddresses;

  19. }

可以看出,在该函数中,首先创建了一个 AcceptorOperationFuture对象,并把其加入一个队列中,等待其他线程处理。该对象类似Java中自带的Future<?>对象,代表一个异步执行任务的结果。 自后调用了startupAcceptor来进行连接的监听和绑定。下面我们就来看一下startupAcceptor函数的具体实现。

  1. private void startupAcceptor() {

  2.     // 如果acceptorRef没个赋值,则新建一个Acceptor对象进行赋值,并执行该acceptor。

  3.     Acceptor acceptor = acceptorRef.get();

  4.     if (acceptor == null) {

  5.     acceptor = new Acceptor();

  6.     if (acceptorRef.compareAndSet(null, acceptor)) {

  7.         executeWorker(acceptor);

  8.     }

  9. }

从代码可以看出,该函数进行了一个原子操作。如果acceptorRef没被赋值,则新建对象进行赋值,并调用executeWroker函数。其中Acceptor实现了Runnable接口,而executorWorker函数的主要功能就是把该acceptor交给之前创建的Executor来执行。下面我们来看一下Acceptorrun函数,看一下它的具体执行过程。

  1. public void run() {

  2.     int nHandles = 0;

  3.     while (selectable) {

  4.         // 等待用户连接请求

  5.         int selected = select();

  6.         // 注册新的地址绑定

  7.         nHandles += registerHandles();

  8.         // 没有绑定的地址,可以退出

  9.         if (nHandles == 0) {

  10.             acceptorRef.set(null);

  11.             if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {

  12.                 break;

  13.            }

  14.            if (!acceptorRef.compareAndSet(null, this)) {

  15.                break;

  16.            }

  17.         }

  18.         // 处理用户连接请求

  19.         if (selected > 0) {

  20.              processHandles(selectedHandles());

  21.         }

  22.        // 检查是否有unbind请求.

  23.         nHandles -= unregisterHandles();

  24.     }

  25. }

其中的select()函数的实现在 NIOSocketAcceptor类中:

  1. @Override

  2. protected int select() throws Exception {

  3.     return selector.select();

  4. }

可以看出,select函数在之前创建的Selector上调用select函数。等待绑定在该Selector上的Channel状态就绪。由于在第一次调用时,该Selector上并没有绑定任何Channel,所以该函数会永远阻塞住。这也就是为什么在之前的 bindInternal中,在调用 startAcceptor后,需要马上调用wakeup函数。该函数的实现在 NIOSocketAcceptor类中:

  1. @Override

  2. protected void wakeup() {

  3.     selector.wakeup();

  4. }

该函数会调用Selectorwakeup函数,用来唤醒阻塞的select函数。

接下来我们来看一下registerHandlers函数,该函数的作用是用来检查是否有地址绑定的请求。并进行绑定。(unRegisterHandles与之相似,过程相反,这里略去不提。)

  1. private int registerHandles() {

  2.     for (;;) {

  3.         // 从队列中获得绑定任务

  4.         AcceptorOperationFuture future = registerQueue.poll();

  5.         // 队列为空,直接退出,返回0

  6.         if (future == null) {

  7.            return 0;

  8.         }

  9.         Map<SocketAddress, ServerSocketChannel> newHandles = new ConcurrentHashMap<SocketAddress, ServerSocketChannel>();

  10.         // 获得需要绑定的地址

  11.         List<SocketAddress> localAddresses = future.getLocalAddresses();

  12.         // 一次绑定

  13.         for (SocketAddress a : localAddresses) {

  14.             // 为每个地址创建ServerSocketChannel对象

  15.             ServerSocketChannel handle = open(a);

  16.             newHandles.put(localAddress(handle), handle);

  17.         }

  18.         // 更新已绑定的连接

  19.         boundHandles.putAll(newHandles);

  20.         // 通知 AcceptorOperationFuture任务已经完成

  21.         // 这样之前调用awaitUninterruptibly()阻塞的线程将继续执行。 

  22.         future.setDone();

  23.         // 返回已绑定的个数

  24.         return newHandles.size();

  25.     }

  26. }

该函数的作用是检查是否有绑定的请求,然后为每个地址建立一个连接,并绑定。其中创建连接调用的是open函数。该函数的实现在 NIOSocketAcceptor类中:

  1. @Override

  2. protected ServerSocketChannel open(SocketAddress localAddress) throws Exception {

  3.       // 创建一个新的ServerSocketChannel

  4.      ServerSocketChannel channel = ServerSocketChannel.open();

  5.     // 设置为非阻塞

  6.     channel.configureBlocking(false);

  7.     // 绑定地址

  8.     ServerSocket socket = channel.socket();

  9.     socket.bind(localAddress, getBacklog());

  10.     // 在Selector对象中注册该Channel。

  11.     channel.register(selector, SelectionKey.OP_ACCEPT);

  12.     return channel;

  13. }

我们现在再把注意力移回之前的Acceptorrun函数。如果之前的select函数是正常返回,而不是被wakeup,那么说明有用户的连接请求。接下来就会执行processHandles函数。其实现如下:

  1. private void processHandles(Iterator<ServerSocketChannel> handles) throws Exception {

  2.    while (handles.hasNext()) {

  3.         ServerSocketChannel handle = handles.next();

  4.         handles.remove();

  5.         // 接受用户的请求,并创Session。

  6.         NioSession session = accept(processor, handle);

  7.         // 初始化Session

  8.        initSession(session, null, null);

  9.        // 把session添加到一个Processor的处理队列中,等待IO读写处理

  10.        session.getProcessor().add(session);

  11.     }

  12. }

下面我们来看一下accept函数。该函数接收两个参数,一个是有连接请求的ServerSocketChannel,一个是之前创建的SimpleIoProcessorPool<NioSession>对象。

该函数的实现在NIOSocketAcceptor类中:

  1. @Override
  2. protected NioSession accept(IoProcessor<NioSession> processor, ServerSocketChannel handle) throws Exception {
  3.     // 获得用户连接
  4.     SocketChannel ch = handle.accept();
  5.     // 创建Session
  6.     return new NioSocketSession(this, processor, ch);
  7. }

之后我们来看一下Session在Processor上的注册过程。及SimpleIoProcessorPool<NioSession>的add函数:

  1. public final void add(S session) {

  2.     // 从Processor池中获得一个Processor(NioProcessor),并注册session.

  3.     getProcessor(session).add(session);

  4. }

  5. private IoProcessor<S> getProcessor(S session) {

  6.     // 如果该session已经注册一个Processor,则返回该Processor

  7.     // 如果不存在,则从池中随机选择一个,并绑定到该session上

  8.     IoProcessor<S> processor = (IoProcessor<S>) session.getAttribute(PROCESSOR);

  9.     if(processor == null) {

  10.         processor = pool[Math.abs((int) session.getId()) % pool.length];

  11.         session.setAttributeIfAbsent(PROCESSOR, processor);

  12.     }

  13.      return processor;

  14. }

接下来我们来看一下NioProcessor中的add函数。整个过程和之前监听用户连接请求的过程相似。故我们只关注其中不同的地方

  1. public final void add(S session)  {

  2.     newSessions.add(session); // 加入请求队列

  3.     startupProcessor();

  4. }


  5. private void startupProcessor() {

  6.     Processor processor = processorRef.get();

  7.     if (processor == null) {

  8.         processor = new Processor();

  9.         if (processorRef.compareAndSet(null, processor)) {

  10.             executor.execute(new NamePreservingRunnable(processor, threadName));

  11.         }

  12.      }

  13.      wakeup();

  14. }


  15. public void run() {

  16.     int nSessions = 0;

  17.     for (;;) {

  18.         int selected = select(SELECT_TIMEOUT);

  19.         // 处理新的session的注册

  20.         nSessions += handleNewSessions();

  21.         if (selected > 0) {

  22.             process();

  23.         }

  24.         // 刷新排队的写请求

  25.         flush(currentTime);

  26.         nSessions -= removeSessions();

  27.         if (nSessions == 0) {

  28.             processorRef.set(null);

  29.             if (newSessions.isEmpty() && isSelectorEmpty()) {

  30.                 break;

  31.             }

  32.             if (!processorRef.compareAndSet(null, this)) {

  33.                 break;

  34.             }

  35.        }

  36. }

接下来我们来看一下其中的 handleNewSessions函数

  1. private int handleNewSessions() {

  2.     int addedSessions = 0;

  3.     for (S session = newSessions.poll(); session != null; session = newSessions.poll()) {

  4.         if (addNow(session)) {

  5.             addedSessions++;

  6.         }

  7.     }

  8.     return addedSessions;

  9. }

  10. private boolean addNow(S session) {

  11.     boolean registered = false;

  12.     // 注册session

  13.     init(session);

  14.     registered = true;

  15.     // 构建FilterChain

  16.     IoFilterChainBuilder chainBuilder = session.getService().getFilterChainBuilder();

  17.     chainBuilder.buildFilterChain(session.getFilterChain());

  18.     return registered;

  19. }

  20. @Override

  21. protected void init(NioSession session) throws Exception {

  22.     // 把Session对应的SocketChannel注册在该session对应的processor的Selector。

  23.     SelectableChannel ch = (SelectableChannel) session.getChannel();

  24.     ch.configureBlocking(false);

  25.     session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ,session));

  26. }

接下来我们再来看一下process函数。该函数负责数据的读写,并把数据传递给FilterChain,并最终调用用户的IoHandler
  1. private void process() throws Exception {

  2.     for (Iterator<S> i = selectedSessions(); i.hasNext();) {

  3.         S session = i.next();

  4.         process(session);

  5.         i.remove();

  6.     }

  7. }

  8. private void process(S session) {

  9.     // 读请求

  10.     if (isReadable(session) && !session.isReadSuspended()) {

  11.         read(session);

  12.     }

  13.     // 写请求

  14.     if (isWritable(session) && !session.isWriteSuspended()) {

  15.         if (session.setScheduledForFlush(true)) {

  16.            // 加入写刷新队列

  17.            flushingSessions.add(session);

  18.         }

  19.     }

  20. }


  21. private void read(S session) {

  22. // 从SocketChannel中读数据

  23. // 这里略去

  24. // 如果读到数据

  25. if (readBytes > 0) {

  26. // 把数据传给FilterChain

  27. IoFilterChain filterChain = session.getFilterChain();

  28. filterChain.fireMessageReceived(buf);

  29. }

  30. }

最后是flush函数,用来刷新写请求的数据。这里暂时略去。
因为我已经写不动了,累死了。哪天在另开文章吧。

Nutch中Hadoop的应用之Injector

本文首发自我和其他人合作的集体博客中。地址如下,欢迎去围观。

http://my.oschina.net/intellicapacity/blog/17175

很受伤!

昨天晚上本来要去听学校古琴社的音乐会的,可是淘宝的一个电话弄的我兴趣全无。

电话里面我被告知,暑假的实习职位是“产品技术部-淘宝商城”。听后我这个囧呀,本来不是说好的“开放平台”吗?怎么成“淘宝商城”了。我还特意问了下这个部门是怎么分的,结果被告知是随机。听后我又再次囧了,部门原来还可以随机分。

后来我又问了下这个部门是干什么的,结果和我猜的一样,主要就是针对淘宝商城的WEB开发。听后我这个崩溃呀,个人对于WEB开发不是太感兴趣的。

之前宣讲的时候,说职位是按自己的意愿和公司的实际情况协调分配。我后来为了保险,怕被随便分个部门,面试后还特意联系了下开放平台的“放翁”大神,说想去他们部门,他当时也答应了。可是竟然还是悲剧了。

我昨天听到这个职位分配的时候,并没太失望,因为我想可能放翁没和他们说清楚,还可以调整一下,所以我后来就又邮件问了下他。他今天上午回复了我,但结果却是让人失望的。他说是因为他们内部的沟通问题,没把我要到他们部门,还说哪个部门不重要,都能学到知识。意思就是不想帮忙换了。

收到这个邮件后,我是真的纠结了。为了淘宝这个职位,我把所有其他的offer都拒掉了,结果后来却换来个随机职位。我当时想去淘宝,一方面是因为杭州的原因,还有淘宝的江湖文化,但最重要的还是淘宝宣讲时提到的那些技术和相关岗位。面试的时候,我也提到过想进淘宝的一些相关部门,但放翁未置可否,说要进去统一协调分配。而他们面试官可以选一些人到自己的部门。我之前也了解过,放翁是开放平台的。虽然当时并不了解开放平台的具体工作,但是感觉这个部门是个较新的部门,而且现在开放也是个大潮,还有放翁本身确实很有人格魅力,所以当时就决定要去开放平台了。应为即便不了解开放平台的具体任务,但是也总比进去后在像没头苍蝇一样乱转强,万一被分个自己不太喜欢的职位就悲剧了。

但是事与愿违,最怕什么他就来什么。结果弄到最后,还是个随机的部门,而且人品不佳,没随机到自己想要的。昨天和“淘宝商城”的罗松聊的时候,我不停的在问这个部门具体是干什么的,会用到什么,我感兴趣的方面在这个部门有应用没?他后来也感觉到了我的顾虑,说了和今天放翁差不多的话,“你放心,进来你肯定能学到东西的”。能学到东西,这句话确实不假。我也相信想淘宝这样的公司,任何部门都是有好多东西可以学习的。但是可以学习并不代表我就喜欢那些东西。我来淘宝并不是因为淘宝这个公司,而是我看中的这个公司的某些方面。这和我高考差不多,我来南大的原因不是因为南大怎样怎样,而是我分不够清华的软件学院,而南大的软件又排名很靠前,仅此而已。不知道其他人怎么个情况,反正我高考和找实习目的性都是很强的,不是说进个好学校或者好公司就完事了,而是要看到底时什么专业,什么职位。

说了这么多,无非就是发发牢骚。因为为了淘宝,我把其他的offer都拒了,其中不乏有高工资的和我很喜欢的职位的。我还很HAPPY的等着暑假去杭州,还很HAPPY的注册了淘宝和支付宝的帐号,还HAPPY的为自己将要成为淘宝的一员而高兴。可结果却是这么出乎意料。正所谓期望越高,失望越大吧。这就像你为了一个女生,把很多追求你的并且条件很好的美女都给拒绝了,可是后来那个女生却把你甩了,这能不让人受伤嘛!

当然这事也不能怨谁。人家没和你商量就给你分个职位又能怎样,毕竟人家那么多实习生,也不能挨个问是吧。况且人家给你机会去实习,还给你工资,住宿,已经是给你面子了,你还挑三捡四什么。还有人家放翁也不过就是口头外加邮件答应了你,说说而已,你自己要是认真了,那你就输了。总之要怪就怪自己对职位挑三捡四,怪自己人品不好吧。

今天得知这个消息后,我又厚着脸皮问了前几天才拒绝的盛大,结果人家大人不计小人过,很大度的还肯要我。而且那个职位还是我很喜欢的,并且还有个学长在那个部门。之前我最纠结的也是在盛大和淘宝之间选择。后来因为种种原因选了淘宝,可是最终还是去了盛大,这也算是命运的冥冥安排吧。

Hadoop RPC分析之服务端

RPC即Remote Procedure Call的简称。Hadoop各个“模块”(如NameNode, DataNode等)之间就是使用RPC来进行通信。下面我们来对Hadoop的RPC实现进行一下分析,来看一下它的庐山真面目。

Hadoop的RPC大致可以分为三部分:服务端,客户端,以及两者之间的调用协议。

我们首先来看一下org.apache.hadoop.ipc.RPC.Invocation

该类实现了Writable接口。保存了客户端向服务端发起调用的具体信息。如调用的函数名,参数类型和参数值。

然后我们来看一下服务端的实现情况。首先是抽象父类org.apache.hadoop.ipc.Server以及子类org.apache.hadoop.ipc.RPC.Server

这两个类是服务端的核心。通过父类的构造函数我们可以看出,要构建一个Server,需要传入Server需要监听的地址和端口,还有客户端传来的参数类型(即上面提到的org.apache.hadoop.ipc.RPC.Invocation),以及处理的线程数(后文会提到)。之后调用start()函数即可启动Server。

下面我们来详细看一下org.apache.hadoop.ipc.Server的构造函数(为了方便理解,已经忽略一些和安全相关的细节和次要信息):

  1. listener = new Listener();
  2. responder = new Responder();

可以看出在构造函数中新建了两个对象。分别是org.apache.hadoop.ipc.Server.Listenerorg.apache.hadoop.ipc.Server.Responder。其中的Listener是用来监听客户端的连接请求的,而Responder是用来处理请求回复的。

接下来我们来看一下org.apache.hadoop.ipc.Serverstart()函数:

  1. responder.start();
  2. listener.start();
  3. handlers = new Handler[handlerCount];
  4. for (int i = 0; i < handlerCount; i++) {
  5.     handlers[i] = new Handler(i);
  6.     handlers[i].start();
  7. }

可以看出,这个函数首先启动了之前在构造函数中创建的两个线程对象。之后又新建了一组Handler对象,并启动。其中org.apache.hadoop.ipc.Server.Handler继承自Thread类,用来执行用户的实际请求,该类将在后面详细介绍。

下面我们来看一下org.apache.hadoop.ipc.Server.Listener

首先要说明的是,这个类继承自Java的Thread类。上文提到,这个类是用来监听用户连接请求的。下面我们来看一下它的构造函数:

  1. // 创建Socket,并绑定到制定的地址和端口
  2. address = new InetSocketAddress(bindAddress, port);
  3. acceptChannel = ServerSocketChannel.open();
  4. acceptChannel.configureBlocking(false);
  5. bind(acceptChannel.socket(), address, backlogLength);
  6. // 创建Selector
  7. selector = Selector.open();
  8. // 创建readThreads个线程,具体负责读取用户的调用请求
  9. readers = new Reader[readThreads];
  10. for (int i = 0; i < readThreads; i++) {
  11.     Selector readSelector = Selector.open();
  12.     Reader reader = new Reader(“Socket Reader #” + (i + 1) + ” for port ” + port, readSelector);
  13.     readers[i] = reader;
  14.     reader.start();
  15. }
  16. // 注册selector
  17. acceptChannel.register(selector, SelectionKey.OP_ACCEPT);

在构造函数中出现了一个我们还没有介绍的类,org.apache.hadoop.ipc.Server.Listener.Reader,这个类也继承自Thread类。其作用是用来读取用户请求的,我们会在稍后介绍。

下面我们再来看一下org.apache.hadoop.ipc.Server.Listenerrun()函数:

  1. while (running) {
  2.     SelectionKey key = null;
  3.     // 等待连接
  4.     selector.select();
  5.     // 遍历所有就绪的连接,并调用doAccept()函数
  6.     Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
  7.     while (iter.hasNext()) {
  8.         key = iter.next();
  9.         iter.remove();
  10.         if (key.isValid()) {
  11.             if (key.isAcceptable()) {
  12.                 doAccept(key);
  13.             }
  14.         }
  15.     }
  16. }

从函数我们可以看出,Listener线程启动后,会等待用户的连接,当收到用户连接后,会调用了doAccept()函数。

所以我们再来看一下org.apache.hadoop.ipc.Server.ListenerdoAccept()函数:

  1. while ((channel = server.accept()) != null) {
  2.     //获得用户连接的channel。
  3.     channel.configureBlocking(false);
  4.     // 获得一个Reader对象
  5.     Reader reader = getReader();
  6.     // 把用户连接的channel绑定到Reader对象上
  7.     SelectionKey readKey = reader.registerChannel(channel);
  8.     //新建一个Connection对象,并绑定到readKey对象上。
  9.     // Connection类封装了用户的连接和数据读取
  10.     // Connection类会在下面详细介绍
  11.     c = new Connection(readKey, channel, System.currentTimeMillis());
  12.     readKey.attach(c);
  13. }

可以看出,doAccept()函数把每个用户的连接SocketChannel绑定到一个Reader对象(Reader对象在Listener构造函数中被创建)上进行处理。下面我们就来看一下org.apache.hadoop.ipc.Server.Listener.Reader类。

现在我们来看一下其中的run()函数:

  1. // 等待连接
  2. readSelector.select();
  3. Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
  4. // 对每个连接,调用doRead()函数
  5. while (iter.hasNext()) {
  6.     key = iter.next();
  7.     iter.remove();
  8.     if (key.isValid()) {
  9.         if (key.isReadable()) {
  10.             doRead(key);
  11.     }
  12. }

可以看出,当Reader对象线程在Listener构造函数中被创建和启动后,会阻塞在select()函数调用。当Listener对象的doAccept()中把用户的连接注册到Reader对象上时,解除阻塞,Reader对象开始处理用户的连接。

可以看到,上面的函数调用了doRead()函数,下面我们来看一下org.apache.hadoop.ipc.Server.ListenerdoRead()函数:

  1. // 获得之前绑定在SelectionKey上的Connection对象
  2. Connection c = (Connection) key.attachment();
  3. // 调用readAndProcess()函数进行用户函数请求的读取和处理
  4. count = c.readAndProcess();

我们已经几次提到了Connection类。上文中也提到,它是对用户一次连接的封装,封装了对数据的读取和处理。下面我们就来具体看一下这个org.apache.hadoop.ipc.Server.Connection类。

下面我们来看一下readAndProcess()函数(忽略了一些安全处理和消息头处理等次要信息):

  1. ByteBuffer dataLengthBuffer = ByteBuffer.alllocate(4);
  2. ByteBuffer data = null;
  3. // 读取参数长度
  4. channelRead(channel, dataLengthBuffer);
  5. dataLengthBuffer.flip();
  6. dataLength = dataLengthBuffer.getInt();
  7. data = ByteBuffer.allocate(dataLength);
  8. // 读取实际参数
  9. channelRead(channel, data);
  10. processData(data.array());

从上面函数可以看出,Hadoop RPC的消息格式(不包括头信息)为数据长度+实际数据。

下面我们再来看一下processData()的实现:

  1. // 新建数据流,并从中读取一个int值
  2. DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf));
  3. int id = dis.readInt(); 
  4. // 通过反射构建一个对象来保存用户传来的具体请求,即前文提到的Invocation
  5. Writable param = ReflectionUtils.newInstance(paramClass, conf);
  6. param.readFields(dis);
  7. // 新建一个Call对象,并加入请求队列
  8. // Call类是对用户请求的封装
  9. Call call = new Call(id, param, this);
  10. callQueue.put(call);

可以看出,用户请求的消息体包括一个请求ID和请求的实际内容。Reader对象利用Connection把用户的请求从流中读出,并封装成Call对象。但是并没有处理,而是把Call对象加入一个请求队列中,等待其他线程处理。

那这些Call对象最终是被谁处理的呢?答案就是我们上文提到的,在org.apache.hadoop.ipc.Serverstart()函数中创建的Handler对象。现在我们来看一下org.apache.hadoop.ipc.Server.Handler

我们来看一下它的run()函数的实现:

  1. ByteArrayOutputStream buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
  2. // 从请求队列中取出一个Call对象来处理
  3. final Call call = callQueue.take();
  4. // 执行用户的请求,并得到结果
  5. Writable value = call(call.connection.protocol, call.param, call.timestamp);
  6. // 根据执行的结构构建回复,把结果保存在Call对象中
  7. setupResponse(buf, call, (error == null) ? Status.SUCCESS : Status.ERROR, value, errorClass, error);
  8. // 把回复交给responder来处理。
  9. responder.doRespond(call);

所以可以看出,当Handler线程启动后,会不断的从请求队列中取出Call对象,并处理。然后将处理的结果交给Responser对象处理。

其中调用的call()为抽象函数,在子类org.apache.hadoop.ipc.RPC.Server中实现。代码如下:

  1. Invocation call = (Invocation) param;
  2. // 获得要调用的函数
  3. Method method = protocol.getMethod(call.getMethodName(),call.getParameterClasses());
  4. method.setAccessible(true);
  5. //通过反射,调用函数
  6. Object value = method.invoke(instance, call.getParameters());
  7. //返回结果
  8. return new ObjectWritable(method.getReturnType(), value);

其中的responder为在org.apache.hadoop.ipc.Server的构造函数中创建的对象,负责处理回复。

终于到了最后一步,我们来看一下请求是如何返回给用户的。首先我们来看一下org.apache.hadoop.ipc.Server.Responder类:

我们先来看一下doResponse()函数:

  1. // 可以看出,每个Connection对象都有一个回复队列,缓存还未发送给用户的回复
  2. synchronized (call.connection.responseQueue) {
  3.     // 添加到队列尾部
  4.     call.connection.responseQueue.addLast(call);
  5.     // 只有一个对象在队列中
  6.     if (call.connection.responseQueue.size() == 1) {
  7.         processResponse(call.connection.responseQueue);
  8.     }
  9. }

接下来是processResponse()函数:

  1. // 获得队列中第一个对象
  2. call = responseQueue.removeFirst();
  3. SocketChannel channel = call.connection.channel;
  4. // 向用户写入数据
  5. int numBytes = channelWrite(channel, call.response);
  6. // 由于我们的Socket是非阻塞模式,所以一次写入可能不能写入全部内容。
  7. // 所以如果我们没有写入全部数据,则把Call对象在加入队首。
  8. // 交由异步线程处理。
  9. if (call.response.hasRemaining()) {
  10.     call.connection.responseQueue.addFirst(call);
  11.     channel.register(writeSelector, SelectionKey.OP_WRITE, call);
  12. }

下面我们来看一下和异步处理有关的几个函数,首先是run():

  1. writeSelector.select(PURGE_INTERVAL);
  2. Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
  3. while (iter.hasNext()) {
  4.     SelectionKey key = iter.next();
  5.     iter.remove();
  6.     if (key.isValid() && key.isWritable()) {
  7.         doAsyncWrite(key);
  8.     }
  9. }

这个函数和之前异步处理的类似。首先会阻塞在select函数上,当其他线程把某个Channel注册到这个函数等待的Selector上,并且有数据处理的时候,函数解除阻塞,继续执行。

最后我们来看一下doAsyncWrite()函数:

  1. Call call = (Call) key.attachment();
  2. processResponse(call.connection.responseQueue);

这样用户请求的结果就最终返回给用户,完成一次RPC请求。

综上所述,当Hadoop RPC的服务端开启时,会新建一个Listener线程用来监听用户的连接请求;新建一个Responsder线程来处理回复;新建若干Handler线程来实际执行用户的请求。几个线程之间异步进行。其中的Listener线程中,还会创建若干个Reader线程来读取用户请求的数据,Reader线程会阻塞,直到Listener线程接受到用户的请求,并把请求传递给Reader线程。Reader线程读取用户数据后,会把数据封装成Call对象,放入待处理的队列中。而Handler线程则会不停的从待处理队列中取出对象,处理,并把处理的结果交给Responsder对象处理。Responsder对象会尝试一次以同步的方式写入数据。由于是非阻塞模式,可能存在剩余数据,则把剩余的数据放入每个连接中的等待队列中,交给异步线程处理。

选择!选择!选择!

五一假期后开学的第一天,本来没什么特别的。

下午正在机房的时候,突然手机响了,一看是上海的号,可是看着貌似又不是之前盛大的,一接竟然是ebay的,然后竟然还跟我说面试过了,问我要不要去他们那实习。我一听就囧了。ebay面试已经是一周前的事情了,当时前两轮还凑合。可是第三轮的时候,由于自己对那个无聊的群面题目不感兴趣,所以就基本没怎么表现,本以为都挂了,可是今天竟然收到了录用通知。所以我不禁怀疑,难道是我前两轮太出色了(小小的自恋下,嘿嘿!)?后来问了下HR我的职位,是“Search Science”,而且貌似还有一位上街的学长在那。后来又和那个部门的一个员工聊了聊,简单了解下了具体的工作情况。

当时正在纠结要不要去的时候,又接到了”盛大游戏“的offer。这下我就更囧了,一个还没搞定,又来一个。可以说当初面盛大算是一个意外的。最初的时候就随便在大街网上投了个简历,当时也没报什么希望。没想到后来竟然有了回音,让我去上海面试。这一面还真就面过了。这里还要提一下,盛大我那个部门也有一个上届的学长,而且还是位大牛,真正的大牛。接到offer后,我就联系了下那位大牛学长,简单聊了下盛大的工作情况,也问了下上海的消费情况。后来给力的事情发生了,那个学长知道我的实习工资后,竟然主动提议要和他们主管说说给涨点。我当时那个感动呀。在这只能说一句,学长你太给力了。

正在我纠结二选一的时候,又收到了淘宝的offer。当时我就再一次的囧了。从楚汉争霸变成了三足鼎力。淘宝的笔试已经是好久之前的事情了。面试是在上个月的28号。当时面我的是放翁大神。之前只在微薄和程序员杂志上看过大神,那天见到真身,小小的激动了下。大神不仅仅是技术牛人,人格魅力也是没得说。可以说那天不仅仅是面试,对我来说,更是一次聆听大神的教诲。面试开始,大神问了一些基础的问题,后来又以微博为例,考了下我如何解决数据库的瓶颈问题。后来的问题就和技术不怎么相关。那天从大神那里学到了“简单”才是王道,不能单纯的仅仅是追求技术,进入淘宝能教给我们的就是解决问题的能力。那天跟大神交流后,也感觉到自己近期为了找实习,整个人变得有点浮躁,学习技术有点带有找实习的功利性质。所以自己近期是应该好好反思下,沉淀下了。摆正方向才能更好的前进。

没有offer的时候心里着急,offer多了又开始纠结要去哪个。现在总体来说,我的排名是淘宝 > 盛大 > ebay。淘宝排在第一,一是那天跟大神交流很有感触,感觉淘宝确实是个务实不错的公司,还有就是自己对杭州比较有感觉。盛大排第二是因为那个职位和自己的兴趣比较相近,并且有个给力的学长在那,唯一不足的就是在上海,自己对上海没什么特殊感觉。ebay最后主要是个人不太习惯外企的那种氛围,仅此而已。

面试总结

从这学期开始到现在的两个月中,杂七杂八,大大小小的面试也弄过6次了,在此算是做个小小的总结吧!

第一次面的是淘宝的技术产品部,是电话面试。现在想来,那可真是失败的一次面试。面试官先问了一些关于JAVA基础的问题,答的还可以。后来又问了一些关于cookie的问题,这时我就悲剧了。cookie这东西早就接触过,可是却没怎么实践过,所以一被问道细节问题就悲剧了。然后又问了些关于数据库的问题,这一问不要紧,让我知道了我数据库是多么的薄弱,以前以为数据库不就是写写SQL查询嘛,效率优化的问题也都没考虑过,索引,锁也就是知道个大概,全然不知其所以然,结果当然是被一问三不知。面试当然也就是以悲剧收场。虽然面试结果悲剧了,但是这次面试却使我清楚的认识道了自己之前在技术方面的不足,以及将来奋斗的方向。

第二次是百度的奇艺,招的的linux程序设计方向的。这好当时学校正在教授这门课,自己也把《Beginning Linux Programming》从头到尾看了遍,本以为没什么问题的。可是事实就是这么残酷。面试官问的问题自己都知道,但是一旦深入讨论时,才发现自己看的那点简直就是皮毛,而且自己的实践也不够,缺乏对技术的深入理解。可想而知,这次面试自然也是以悲剧收场。面试之后,我才后悔自己当时为什么没有把《APUE》那本书好好研读下。同时也知道自己还嫩的很。

经历了两次面试悲剧后,我对自己的技术水平有了重新的定位,同时也知道了企业和学校所学知识的巨大差距。从那以后,就开始了我的恶补技术之旅。

只要你努力,自然就会有回报的。第三次的面试让我看到了我努力的成果。

第三次还是淘宝的电话面试,不过是产品研发部。这次面试和第一次相比,更偏基础一点。问了JAVA基础,JAVA WEB基础,数据结构,操作系统等知识。除了数据结构和算法答的稍微差点,其他的答的还不错,就很顺利的通过了一面。二面的时候就没怎么问技术问题,然后就这么通过了。当时说好可以7月份去实习。可是天有不测风云呀,计划没有变化快,还没等我来得及高兴,就被告知如果现在去的话,他们可以拍板。但是如果要暑假去的话,那就得参加淘宝的暑期招聘。没办法,所以本来成功的一次面试最终还是以悲剧收场。

第四次是支付宝的电话面试,当时正在图书馆。这次电话面试可以说是一个转折点。那次面试的是JAVA研发工程师职位,所以那个面试官就围绕JAVA的各方面一顿狂问。首先是集合,然后是多线程,然后是网络。整体答的还是很不错的,也能听出来面试官比较满意。后来又连说带笑的问了点关于LINUX的问题。一共面了一个多小时,然后说让我等结果。后来是某天的课上,接到了HR的电话,说让我4月14号去实习。我说现在走不开,可不可以7月份,然后那个HR说向上面反映下,之后就没有消息了。一次成功的面试又再次悲剧。虽然最后悲剧了,但是有一点收获就是大一刚学JAVA的时候看得那本《Thinking in Java》没白看。虽然当时看得很慢很吃力,但是确实给我的JAVA打下了深厚的基础,也是后来学习其他新技术的基石。

第五次是ebay的校园招聘。个人本身对ebay并没有太多兴趣,那天去只是为了想锻炼一下,给接下来的淘宝暑期实习面试攒经验。不过这次招聘也还是比较有意义的。首先,这次招聘是我第一次使用出我这两个月潜心研究的新技术”Hadoop, NoSQL, MySQL优化”。虽然我对于那三项技术还只是入门阶段,但是在面试中的杀伤力却是大大超出我的想象的。这些新技术是很能打动面试官的。这次招聘的另一个亮点就是英语招聘。第一次被英语面试,算是尝尝鲜吧。听懂面试官的英语这个很简单,但是我对自己的口语一直没有什么信心。说着说着自己就没底气了,感觉说的很烂。可是那个面试官却说我口语还不错,还挺满意的。所以看来所谓的英语口语,只要能让对方听懂,就是好口语,语法句型什么的,都是浮云呀。ebay这次面试一直坚持到了最后一轮,不知道最后能是个什么结果。

最近的一次,也就是第六次,是盛大的面试。这次面试与以往不同的是要自己去上海盛大公司。那天参加玩ebay的面试回到寝室的时候,已经是9点多了。回到寝室,简单收拾了一下,就上床睡觉了。第二天早上5点的时候就从床上爬起来,然后坐最早的公交到火车站,然后坐高铁去上海,然后又坐了一个小时的地铁才到了盛大。到了之后,还没缓过来气,就开始了笔试。可是悲剧还没就此结束。那位HR姐姐在给我发卷子的时候发成DBA的了。结果在笔试时间过了一半的时候才发现,所以说我最后相当于答了两套卷子,唉!之后就是面试,面试官开始简单问了下项目的问题,然后又问了下Linux一些基本命令。之后又针对Hadoop和NoSQL问了些问题。整个过程也就20分钟。之后的时间出现了戏剧性的转变,变成了我提问题,面试官来解答。我问了一些自己学习中的疑问,和一些盛大以及其他公司在相关技术上的使用问题。我的问题貌似触动了面试官兴奋的神经,他就开始滔滔不绝的将个不停。之后就是二面,面试官是那个部门的老大。问题主要还是针对Hadoop和NoSQL。老大当时让我说下NoSQL和MySQL的区别。这个问题个人感觉是当天回答的最满意的。当时回答这个问题完全是根据自己这2个月对于NoSQL和MySQL的学习体会,一些例子的研究而总结出来的。我回答完后,可以看出来老大还是很满意的。面试的后半段时间和一面一样,同样变成了我的提问时间,老大也都很耐心的解答了。盛大的这次面试,是我第一次在面试中掌握了主动权。

通过这6次面试可以看出,基础是很重要的,它是学习任何其他技术的基础。所以大学阶段,一定要为技术打下一个扎实的基础。其次,对一个技术要有深入的理解,不能浮于表面,要知其然知其所以然。还有就是学习的技术要与工业界接轨,学校所学的只是只是一点点的皮毛。所以我这里推荐大家可以定期关注下知名企业技术团队的博客,看看企业中到底都在使用些什么技术,然后有针对性的学习下。同时也要关注下业内的一些开发大会,比如近期的QCon。在上面真的可以学习到很多,同时也能开阔自己的眼界。