[原]red5知识共享

此文是从之前做的red5知识分享PPT中摘取,内容比较简单粗略。当时在讯飞做爱吼网流媒体服务smp,对red5有点了解。

Red5中AMF格式解析

  1. AMF0数据类型及标志位

    • Number - 0x00 (Encoded as IEEE 64-bit double-precision floating point number)
    • Boolean - 0x01 (Encoded as a single byte of value 0x00 or 0x01)
    • Object - 0x03 (Set of key/value pairs)
    • Null - 0x05
    • ECMA Array - 0x08 (32-bit entry count)
    • Object End - 0x09 (preceded by a empty 16-bit string length)
    • Strict Array - 0x0a (32-bit entry count)
    • Date - 0x0b (Encoded as IEEE 64-bit double-precision floating point number with 16-bit integer timezone offset)
    • Long String - 0x0c (32-bit integer string length with UTF-8 string)
    • XML Document - 0xf0 (32-bit integer string length with UTF-8 string)
    • Typed Object - 0x10 (16-bit integer name length with UTF-8 name, followed by entries)
    • Switch to AMF3 - 0x11
  2. AMF3数据类型及标志位

    • Undefined - 0x00
    • Null - 0x01
    • Boolean False - 0x02
    • Boolean True - 0x03
    • Integer - 0x04 (expandable 8+ bit integer)
    • Double - 0x05 (Encoded as IEEE 64-bit double-precision floating point number)
    • String - 0x06 (expandable 8+ bit integer string length with a UTF-8 string)
    • XML - 0x07 (expandable 8+ bit integer string length and/or flags with a UTF-8 string)
    • Date - 0x08 (expandable 8+ bit integer flags with a IEEE 64-bit double-precision floating point UTC offset time)
    • Array - 0x09 (expandable 8+ bit integer entry count and/or flags with optional expandable 8+ bit integer name lengths with a UTF-8 names)
    • Object - 0x0A (expandable 8+ bit integer entry count and/or flags with optional expandable 8+ - bit integer name lengths with a UTF-8 names)
    • XML End - 0x0B (expandable 8+ bit integer flags)
    • ByteArray - 0x0C (expandable 8+ bit integer flags with optional 8 bit byte length)
  3. Red5中AMF格式解析实现

    red5 amf格式解析实现


    Example

    • 解析AMF数据

      1
      2
      3
      4
      5
      6
      IoBuffer buffer = IoBuffer.wrap(bytes);   //bytes 为AMF0格式byte数组
      Input input = new org.red5.io.amf.Input(buffer);
      Deserializer deserializer = new Deserializer();
      String str = deserializer.deserialize(input, String.class);
      Object object = deserializer.deserialize(input, Object .class);

    • 构造AMF数据

      1
      2
      3
      4
      String str = “test”;
      Output output = new org.red5.io.amf.Output(buffer);
      Serializer serializer = new Serializer();
      serializer.serialize(output, str);

Red5解析RTMP消息包

  • 背景知识
    Mina框架核心部分框架
    mina架构图
    Mina中由Filter过滤器处理来自网络数据包,然后转交给应用层的IoHandler做具体的业务处理

  • RMTP消息包解析
    RTMPMinaProtocolDecoder接收来自Mina的数据,真正做数据解析是由RTMPProtocolDecoder类来完成RTMP数据包header和body的解析工作
    red5消息格式解析

  • Header解析
    RTMP包头结构
    包头有4种长度:12, 8, 4, 1 byte(s).
    完整的12字节RTMP包头每个字节的含义:
用途 大小(Byte) 含义
Head_Type 1 包头
TiMMER 3 时间戳
AMFSize 3 数据大小
AMFType 1 数据类型
StreamID 4 流ID

完整的RTMP包头是12bytes,
包含时间戳 ,AMFSize, AMFType,StreamID信息,
8字节的包头只纪录 了时间戳,AMFSize,AMFType,
其他字节的包头纪录信息依次类推

Red5如何将8,4, 1 字节的不完整包头构造成完整的Header消息?
详见:

1
2
3
4
5
6
7
在RTMPMinaProtocolDecoder.decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)方法中
// get our state
final ProtocolState state = (ProtocolState)
session.getAttribute(ProtocolState.SESSION_KEY);

在RTMPProtocolDecoder. decode(ProtocolState state, IoBuffer in)方法中
final RTMP rtmp = (RTMP) state;

Red5通过在IoSession中维护Header消息的map,通过获取上一个header来将缺失的包头信息补齐,具体代码见
RTMPProtocolDecoder. decodeHeader(IoBuffer in, Header lastHeader)方法

      header消息解析

  • Body解析

    Rtmp包默认的最大长度为128字节,(或通过chunksize改变rtmp包最大
    长度), 当AMF数据超过128Byte的时候就可能有多个rtmp包组成,如果需
    要解码的rtmp包太长则被TCP协议分割成多个TCP包.那么解码的时候需要
    先将包含rtmp包的tcp封包合并, 再把合并的数据解码,解码后可得到amf
    格式的数据,将这些AMF数据取出来就可以对AMF数据解码

    Red5在RTMP中像维护header一样维护一份packet的map信息,除此之外还有维护当前的解码状态。Body的解析首先要从header中得到一个完整packet消息的大小,根据当前的接受到的字节大小还有chunksize来判断是否收到一个完整的packet消息的字节,然后再进行body解析与header形成一个业务类,最后转发到RTMPMinaIoHandler做业务处理。

    具体代码见RTMPPotocolDecoder.decodePacket(RTMP rtmp, IoBuffer in)方法和decodeMessage(RTMP rtmp, Header header, IoBuffer in)方法

Hello World

今日把以前博客都迁移过来了,对于个人博客只所以选择hexo就是因为特意中意这个主题。许多选择何尝不是,just I Like

[原]Jackrabbit使用介绍

spring modules jcr

spring-modules-jcr 是jackrabbit与spring集成的第三方模块,可以简化JSR-170 API使用。连接:http://www.infoq.com/cn/articles/spring-modules-jcr

spring-modules-jcr主要是通过JcrTemplate中的execute方法操作jackrabbit的读写。在没有事务的环境下,每一个execute都使用独立的session,方法执行完后立即释放。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* @see org.springmodules.jcr.JcrOperations#execute(org.springmodules.jcr.JcrCallback,
* boolean)
*/

public Object execute(JcrCallback action, boolean exposeNativeSession) throws DataAccessException {
Session session = getSession();
//判断当前是否在事务下
boolean existingTransaction = SessionFactoryUtils.isSessionThreadBound(session, getSessionFactory());
if (existingTransaction) {
logger.debug("Found thread-bound Session for JcrTemplate");
}

try {
Session sessionToExpose = (exposeNativeSession ? session : createSessionProxy(session));
Object result = action.doInJcr(sessionToExpose);
// TODO: does flushing (session.refresh) should work here?
// flushIfNecessary(session, existingTransaction);
return result;
}
catch (RepositoryException ex) {
throw convertJcrAccessException(ex);
// IOException are not converted here
}
catch (IOException ex) {
// use method to decouple the static call
throw convertJcrAccessException(ex);
}
catch (RuntimeException ex) {
// Callback code threw application exception...
throw convertJcrAccessException(ex);
}
finally {
if (existingTransaction) {
logger.debug("Not closing pre-bound Jcr Session after JcrTemplate");
}
else {
//不在事务环境下,立即释放session
SessionFactoryUtils.releaseSession(session, getSessionFactory());

}
}
}
~

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void executeTask()
{

logger.debug("get the Gadget class Bean,DagetDao={}", dao);
//添加节点
dao.addNodeIfAbsent("Gadget");
//查询节点
boolean result = dao.nodeExists("Gadget");
logger.debug("add Node success or not ,{}",result);
try{
//删除节点
dao.deleteNodebyName("Gadget");

}
catch (Exception e)
{
e.printStackTrace();
}
//再次查询节点
logger.debug("Node exist or not ,{}",dao.nodeExists("Gadget"));
}

上面测试代码总共4次操作jackrabbit,应该使用4个session

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2015-08-20 20:24:42 [main] DEBUG org.springmodules.jcr.JcrSessionFactory -no session holder provider manager set; using the default one
2015-08-20 20:24:42 [thread-0] DEBUG com.huawei.jackrabbit.test.service.GadgetnoTransactionService -get the Gadget class Bean,DagetDao=com.huawei.jackrabbit.test.DAO.GadgetDao@1e22c75
2015-08-20 20:24:42 [thread-0] DEBUG org.springmodules.jcr.SessionFactoryUtils -Opening JCR Session
2015-08-20 20:24:42 [thread-0] DEBUG com.huawei.jackrabbit.test.DAO.GadgetDao -Starting from root node. node=node /
2015-08-20 20:24:43 [thread-0] INFO com.huawei.jackrabbit.test.DAO.GadgetDao -Saved node. node=node /Gadget
2015-08-20 20:24:43 [thread-0] DEBUG org.springmodules.jcr.SessionFactoryUtils -Closing JCR Session
2015-08-20 20:24:43 [thread-0] DEBUG org.springmodules.jcr.SessionFactoryUtils -Opening JCR Session
2015-08-20 20:24:43 [thread-0] DEBUG com.huawei.jackrabbit.test.DAO.GadgetDao -Starting from root node. node=node /
2015-08-20 20:24:43 [thread-0] DEBUG org.springmodules.jcr.SessionFactoryUtils -Closing JCR Session
2015-08-20 20:24:43 [thread-0] DEBUG com.huawei.jackrabbit.test.service.GadgetnoTransactionService -add Node success or not ,true
2015-08-20 20:24:43 [thread-0] DEBUG org.springmodules.jcr.SessionFactoryUtils -Opening JCR Session
2015-08-20 20:24:43 [thread-0] DEBUG com.huawei.jackrabbit.test.DAO.GadgetDao -delete the node name =Gadget
2015-08-20 20:24:43 [thread-0] DEBUG com.huawei.jackrabbit.test.DAO.GadgetDao -the node exist or not ,false
2015-08-20 20:24:43 [thread-0] DEBUG org.springmodules.jcr.SessionFactoryUtils -Closing JCR Session
2015-08-20 20:24:43 [thread-0] DEBUG org.springmodules.jcr.SessionFactoryUtils -Opening JCR Session
2015-08-20 20:24:43 [thread-0] DEBUG com.huawei.jackrabbit.test.DAO.GadgetDao -Starting from root node. node=node /
2015-08-20 20:24:43 [thread-0] DEBUG org.springmodules.jcr.SessionFactoryUtils -Closing JCR Session
2015-08-20 20:24:43 [thread-0] DEBUG com.huawei.jackrabbit.test.service.GadgetnoTransactionService -Node exist or not ,false

打印日志可以看出4次打开和关闭session

在事务环境下,每个事务都独立使用一个session,事务提交之后立即释放。所以上面的测试代码在事务下只使用一个session

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
2015-08-21 09:16:06 [main] DEBUG org.springmodules.jcr.JcrSessionFactory -no session holder provider manager set; using the default one
2015-08-21 09:16:06 [thread-0] DEBUG org.springmodules.jcr.jackrabbit.LocalTransactionManager -Creating new transaction with name [com.huawei.jackrabbit.test.service.GadgetService.executeTask]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT; ''
2015-08-21 09:16:06 [thread-0] DEBUG org.springmodules.jcr.jackrabbit.LocalTransactionManager -Opened new session [session-admin-3] for JCR transaction
2015-08-21 09:16:06 [thread-0] DEBUG com.huawei.jackrabbit.test.service.GadgetService -get the Gadget class Bean,DagetDao=com.huawei.jackrabbit.test.DAO.GadgetDao@115470e
2015-08-21 09:16:06 [thread-0] DEBUG org.springmodules.jcr.JcrTemplate -Found thread-bound Session for JcrTemplate
2015-08-21 09:16:07 [thread-0] DEBUG com.huawei.jackrabbit.test.DAO.GadgetDao -Starting from root node. node=node /
2015-08-21 09:16:07 [thread-0] INFO com.huawei.jackrabbit.test.DAO.GadgetDao -Saved node. node=node /Gadget
2015-08-21 09:16:07 [thread-0] DEBUG org.springmodules.jcr.JcrTemplate -Not closing pre-bound Jcr Session after JcrTemplate
2015-08-21 09:16:07 [thread-0] DEBUG org.springmodules.jcr.JcrTemplate -Found thread-bound Session for JcrTemplate
2015-08-21 09:16:07 [thread-0] DEBUG com.huawei.jackrabbit.test.DAO.GadgetDao -Starting from root node. node=node /
2015-08-21 09:16:07 [thread-0] DEBUG org.springmodules.jcr.JcrTemplate -Not closing pre-bound Jcr Session after JcrTemplate
2015-08-21 09:16:07 [thread-0] DEBUG com.huawei.jackrabbit.test.service.GadgetService -add Node success or not ,true
2015-08-21 09:16:07 [thread-0] DEBUG org.springmodules.jcr.JcrTemplate -Found thread-bound Session for JcrTemplate
2015-08-21 09:16:07 [thread-0] DEBUG com.huawei.jackrabbit.test.DAO.GadgetDao -delete the node name =Gadget
2015-08-21 09:16:07 [thread-0] DEBUG com.huawei.jackrabbit.test.DAO.GadgetDao -the node exist or not ,false
2015-08-21 09:16:07 [thread-0] DEBUG org.springmodules.jcr.JcrTemplate -Not closing pre-bound Jcr Session after JcrTemplate
2015-08-21 09:16:07 [thread-0] DEBUG org.springmodules.jcr.JcrTemplate -Found thread-bound Session for JcrTemplate
2015-08-21 09:16:07 [thread-0] DEBUG com.huawei.jackrabbit.test.DAO.GadgetDao -Starting from root node. node=node /
2015-08-21 09:16:07 [thread-0] DEBUG org.springmodules.jcr.JcrTemplate -Not closing pre-bound Jcr Session after JcrTemplate
2015-08-21 09:16:07 [thread-0] DEBUG com.huawei.jackrabbit.test.service.GadgetService -Node exist or not ,false
2015-08-21 09:16:07 [thread-0] DEBUG org.springmodules.jcr.jackrabbit.LocalTransactionManager -Initiating transaction commit
2015-08-21 09:16:07 [thread-0] DEBUG org.springmodules.jcr.jackrabbit.LocalTransactionManager -Committing JCR transaction on session [session-admin-3]
2015-08-21 09:16:07 [thread-0] DEBUG org.springmodules.jcr.jackrabbit.LocalTransactionManager -Closing JCR session [session-admin-3] after transaction
2015-08-21 09:16:07 [thread-0] DEBUG org.springmodules.jcr.SessionFactoryUtils -Closing JCR Session

上面的日志显示本次操作只使用了[session-admin-3] session

嵌套事务使用
spring中默认配置事务传播机制为PROPAGATION_REQUIRED

如果当前处在事务中调用另一事务方法,会被合成一个事务。图中事务方法2发生异常,进行事务回滚后,事务方法1结束后会提交事务,事务已经回滚再次提交会报异常

1
2
3
encountered an error.org.springframework.transaction.UnexpectedRollbackException: Transaction rolled back because it has been marked as rollback-only
at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:252)
at java.util.concurrent.FutureTask.get(FutureTask.java:111)

在实际使用中需要注意嵌套事务的使用。

事务和非事务性能对比

根据节点名来查询jackrabbit节点是否存在,比较在事务和非事务下的性能差异

在事务和非事务下使用50线程分别查询节点并统计查询消耗的平均时间、最大和最大时间
非事务下又分为使用单一session和多session场景,下面是数据统计:

  1. 使用事务,50个线程有50个事务

    1
    -average time:88(ms), max time :94(ms), min time :86(ms)
  2. 非事务,每次查询都使用新session

    1
    -average time:44(ms), max time :54(ms), min time :39(ms)
  3. 非事务,每次查询使用同一session

    1
    -average time:12(ms), max time :23(ms), min time :8(ms)

对比使用单一session耗时最小,性能最佳,使用事务性能最差,但是从事务的ACID特性考虑,使用单一session和多个session不满足原子性要求
上面测试代码executeTask方法在多线程下使用单一session操作会报 javax.jcr.InvalidItemStateException异常,使用多session操作 报javax.jcr.ItemNotFoundException异常。而在事务环境下可以正常完成。
是否使用事务需要从业务的使用场景考虑,如果业务场景就满足原子性操作,或者异常有处理措施保证数据一致性,那么不使用事务也可以,但是这种往往比较复杂对session使用也需要谨慎考虑。
考虑一种场景,一次性添加多个节点,要不全部成功要不全部失败,如果中途失败,前面添加的节点都需要删除,在事务下可以回滚状态,在非事务下就必须代码删除,这样处理很复杂而且容易出错

[译]Jackrabbit并发控制

Apache Jackrabbit内部的并发模型相当复杂,在JackRabbit 1.x发布周期内报告并修复了大量的死锁问题。这篇文章是设计和code review的成果,旨在主动预防其他类似的问题

此文是关于Jackrabbit的内部并发和同步机制模型,不是关于JCR的锁特性。注意此文是从架构层面审阅并发控制,并不关注像个别类或组件的多线程安全之类的问题。
此review是基于JackRabbit 1.5版本的默认版本

架构背景

image

从并发控制方面,Jackrabbit 架构大概可以分成下面的五个层次
1.集群
2.仓库
3.工作空间
4.会话
5.事务

  • 集群
    集群层需要在一个或者多个集群节点恰巧共享它们的内容时,这些节点可以视为独立的仓库,处理这些同步更改。并发控制跨越多个集群节点是通过使用一个单一的写入锁来实现的,这锁是节点在提交任何更改到共享状态时都需要先获取到的。另一方面所有的集群节点都可以在不需要任何明确的同步措施下并行读取已经共享的内容。注意既然所有的集群节点共享一把锁,那么在节点之间就不会发生任何死锁的情况。一个发生死锁的节点仍然会潜在的阻碍写入整个集群,但是如果每个节点本身都是无死锁的,那么集群功能就不会添加任何新的死锁场景
  • 仓库
    仓库层负责所有全局仓库状态比如节点类型注册和版本存储。所有仓库范围的组件都有他们自己的同步机制而不是共享一把全局仓库锁。从并发控制视角来看最值得注意的组件就是版本存储,实际上它有两种锁机制;一个VersionManagerImpl负责高层次的版本操作,另一个底层的SharedItemStateManager为了控制访问底层的持久化机制。
  • 工作空间
    仓库由一个或者多个工作空间组成,工作空间包含这个仓库中正常的内容树状结构。每一个工作空间由一些像持久化机制和查询索引组成。持久化机制通过Shared ISM(共享项状态管理器)和PersistenceManager(持久化管理器)建立的,Shared ISM会控制所有项的操作,PersistenceManager把所有项进行永久存储。大多数的持久化管理使用Java同步和其他锁机制进行并发控制,但是因为它们通常不会与仓库的其他部分相互影响,所以从全局并发角度来看它们并不是关键。另一方面,使用读写锁的SharedISM是关键因素尤其它是与仓库范围的版本存储交互的途径。注意自从Jackrabbit 1.4就可以配置SharedISM的锁策略,通过使用更细粒度的锁配置,允许并发写访问内容树的不同部分。本文关注于默认情况只有一个简单的SharedISM锁,但是从锁角度来看更细粒度的情况相当于拥有更多工作空间因此本文的结论仍然是适用的。
  • 会话
    每一个工作空间可以被零个或多个JCR 会话访问。每个会话拥有一个暂时空间来存储在会话中尚未保存的更改。因为暂时空间是在会话的内部(其他会话不可见)和会话应该每次只被一个线程访问,我们几乎不用关心并发和会话使用会有关联。可是值得注意在许多情况下会话的线程安全需求,Jackrabbit并没有明确地保证,所以一个客户端无论是有意还是无意地在多并发线程中使用单独的一个会话可能会破坏会话的内部状态。
  • 事务
    Jackrabbit处理事务是通过把所有项的操作(如保存暂时更新和直接工作空间更新以及版本和锁操作)封装到一个可以说是更大的暂时空间,当事务被提交时再把它持久化。在Jackrabbit中没有事务锁,但是事务支持仍然从根本上改变了Jackrabbit并发控制比如它基本上通过使用更大的提交操作取代了所有的写操作(还有相关的锁)。只有在XA事务的环境下,事务模式才会被激活。

[译]Jackrabbit是如何工作的

下面的图片解释了当一个用户使用JCR API修改内容仓库时,哪些Jackrabbit的部件会被使用到。这是一个简单而且常用的操作,但会涉及到Jackrabbit的大部分组件。请注意这些实现的架构JCR并没有授权,但是是直接基于JCR设计的

image

下面是写或修改内容仓库时依次使用到的组件和它们各自的功能

  • Transient Item State Manager 暂时项状态管理器
    一旦内容项被会话(session)读取,它们就会被Transient ISM缓存。当这些内容项被修改,在这个所谓的“暂时”空间里这些修改只对同一个session是可见的。

  • Transactional Item State Manager 事务项状态管理器
    当应用使用JCR的Item.save() 或Session.save() 对这些修改项持久化时。这些暂时项被会提升到Transactional ISM。这些修改仍然只在这事务中是可见的,这意味直到被提交前它们对其他session依然是不可见的。如果内容仓库不是运行在XA环境下,这些提交是隐形的。

  • Shared Item State Manager 共享项状态管理器
    一旦事务被提交,Shared ISM会接受到更新日志并把这些更新推送到所有登录到这一工作空间的session。这就意味被其他session缓存和引用到的所有项状态都被会通知,也可能会被更新或者变得无效。Shared ISM 也会触发观察机制并将更新日志传递给这个工作空间配置的持久化管理器。

  • Persistence Manager 持久化管理器
    Persistence Manager会将Shared ISM传递过来的更新日志中的所有项进行持久化操作。持久化管理器简单、快速,事务接口是低层次的不需要理解复杂的仓库操作,只需在底层上根据给定项的ID能够持久化和恢复

  • Observation 观察机制
    当事务被提交,Shared ISM会出发观察机制。这样允许应用在工作空间内异步订阅这些改变。Jackrabbit也提供了同步的观察机制,但这不属于JCR标准之内

  • Query Manager/Index 查询管理/索引
    通过异步观察事件,指示Query Manager索引到新添加项或修改项。内容仓库索引要比经典的关系型数据库索引要复杂的多,因为它要处理内容仓库的特性,比如项的层级,节点继承还有全文检索

[原]windows和Linux查看端口

Windows平台

> 在windows命令行窗口下执行:
>
> 1.查看所有的端口占用情况 
>
>
  netstat -ano

>
>
2.查看指定端口的占用情况

>
>
  netstat -aon|findstr "9050"

>
>
3.查看PID对应的进程

>
>
  tasklist|findstr "9050"

>
>
4.结束该进程

>
>   taskkill /f /t /im xx.exe


Linux查看端口


>
>
>
关闭端口:关闭相应的应用程序就解决端口被占用

>
>
如:

>
>
   “kill -9 PID”

>
>
     通过"netstat -anp | grep ssh"

>
> 有显示:    tcp 0 127.0.0.1:2121 0.0.0.0:* LISTEN 7546/ssh
>
> 则:    "kill -9 7546"



<div>
    作者:qarkly112649 发表于2014/7/7 15:53:16 [原文链接](http://blog.csdn.net/qarkly112649/article/details/37516533)
</div>
<div>
阅读:102 评论:0 [查看评论](http://blog.csdn.net/qarkly112649/article/details/37516533#comments)
</div>

[原]mina中责任链模式的实现



1.责任链模式在mina中有重要的作用,其中Filter机制就是基于责任链实现的,再来回顾mina框架组成






从上图看到消息的接受从IoService层先经过Filter层过滤处理后最后交给IoHander,消息的发送则是反过来从IoHander层经过Filter层再到IoService层。

我们来想想这样做的好处:

第一点就是可以把消息从原始字节封装成对象方便处理,或者从对象到原始字节,那这就是decode和encode过程

第二点过滤消息,业务层只需要处理感兴趣的消息

当然还有其他…

再来想想Filter层是怎么实现的呢

从图中看到接收消息和发送消息经过Filter层是相反处理的,那么每个Filter就必须知道前一个和后一个Filter,我们就很容易想到了双向链表结构。

没错Filter层就是用双向链表实现的,那么让我们来看看Filter层具体是如何实现,即mina中责任链模式的实现


2.先来看下mina的filterchain包结构




Filter层的每个filter都是对上图IoFilter接口的实现,我们将具体讲解IoFilter,IoFilterChain,DefaultIoFilterChain这几个类

IoFilterChainBuilder接口和DefaultIoFilterChainBuilder实现不再细讲,从字面意思就是IoFilterChain的建造者

IoFilterEvent是代表filter事件,IoFilterLifeCycleException是指filter中出现循环链表异常

下面的图是我们要重点讲解的几个类的关系





我们重点讲解的只有四个类,为什么会出现这么多的类呢,有些类使内部实现,不太好表示就都画出来了,先来说明下

IoFilter接口:NextFilter接口是其内部接口

IoFilterAdapter类:对IoFilter接口的实现,是所有Filter的基类

IoFilterChain接口:Entry接口是其内部接口

DefaultIoFilterChain类:是对IoFilterChain接口的实现,有EntryImpl,HeadFilter,TailFilter三个内部类,其中EntryImpl类中又有NextFilter接口的内部实现

还需要说明下:IoFilter还有相关接口就写了两个方法,一个接受消息触发的方法还有一个是发送消息触发的方法,剩下的都是这两类消息处理方法就不表示了

HeadFilter类只对发送消息处理方法重载,TailFilter类只对接受消息处理方法重载,想一想为什么?再想想为什么用内部类来实现?

从上图看到EntryImp类是重点,我们就来看看EntryImpl类的实现


private class EntryImpl implements Entry {
private EntryImpl prevEntry ;

private EntryImpl nextEntry ;

private final String name;

private IoFilter filter ;

private final NextFilter nextFilter;

private EntryImpl(EntryImpl prevEntry, EntryImpl nextEntry, String name, IoFilter filter) {
if (filter == null) {
throw new IllegalArgumentException("filter");
}
if (name == null) {
throw new IllegalArgumentException("name");
}

this.prevEntry = prevEntry;
this.nextEntry = nextEntry;
this.name = name;
this.filter = filter;
this.nextFilter = new NextFilter() {

public void sessionOpened(IoSession session) {
Entry nextEntry = EntryImpl. this.nextEntry ;
callNextSessionOpened(nextEntry, session);
}

public void filterWrite(IoSession session, WriteRequest writeRequest) {
Entry nextEntry = EntryImpl. this.prevEntry ;
callPreviousFilterWrite(nextEntry, session, writeRequest);
}

};
}



从EntryImpl类的构造方法看到,EntryImpl中保持对上一个节点和下一个节点引用,双向链表结构,name即过滤层名称,filter即过滤层的具体实现,而nextFilter是在构造方法中的内部实现。


我们再来看看DefaultIoFilterChain的实现


public class DefaultIoFilterChain implements IoFilterChain {
/ The associated session */
private final AbstractIoSession session ;

private final Map<String, Entry> name2entry = new ConcurrentHashMap<String, Entry>();

/
The chain head /
private final EntryImpl head ;

/** The chain tail
/
private final EntryImpl tail ;

public DefaultIoFilterChain(AbstractIoSession session) {
if (session == null) {
throw new IllegalArgumentException("session");
}

this.session = session;
head = new EntryImpl(null, null, "head" , new HeadFilter ());
tail = new EntryImpl(head, null, "tail", new TailFilter());
head.nextEntry = tail;
}



从上图看到,head顾名思义就是头节点,tail即尾节点,两个节点在构造方法中已经构造好了,head里装的是HeadFilter,tail中即TailFilter。上面我们说到HeadFilter只实现发送消息处理方法,TailFilter只实现接受消息处理方法。


想一想便得知,HeadFilter是发送消息最后的处理节点,TailFilter是接受消息最后的处理节点。最后的节点处理就是将写消息交给Io线程处理,将读消息交给IoHander的业务层处理。所以说HeadFilter和TailFilter只需对某一方消息处理,反面消息默认交给下一个节点处理。

HeadFilter


 private class HeadFilter extends IoFilterAdapter {
@SuppressWarnings("unchecked" )
@Override
public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {

AbstractIoSession s = (AbstractIoSession) session;

// Maintain counters.
if (writeRequest.getMessage() instanceof IoBuffer) {
IoBuffer buffer = (IoBuffer) writeRequest.getMessage();
// I/O processor implementation will call buffer.reset()
// it after the write operation is finished, because
// the buffer will be specified with messageSent event.
buffer.mark();
int remaining = buffer.remaining();

if (remaining == 0) {
// Zero-sized buffer means the internal message
// delimiter.
s.increaseScheduledWriteMessages();
} else {
s.increaseScheduledWriteBytes(remaining);
}
} else {
s.increaseScheduledWriteMessages();
}

WriteRequestQueue writeRequestQueue = s.getWriteRequestQueue();

if (!s.isWriteSuspended()) {
if (writeRequestQueue.size() == 0) {
// We can write directly the message
s.getProcessor().write(s, writeRequest);
} else {
s.getWriteRequestQueue().offer(s, writeRequest);
s.getProcessor().flush(s);
}
} else {
s.getWriteRequestQueue().offer(s, writeRequest);
}
}
}


TailFilter

 private static class TailFilter extends IoFilterAdapter {
@Override
public void sessionOpened(NextFilter nextFilter, IoSession session) throws Exception {
session.getHandler().sessionOpened(session);
}
}


下面我们来看看sessionOpen消息处理的完整过程,首先是IoFilterChain收到这个消息触发fireSessionOpened方法

  public void fireSessionOpened() {
Entry head = this.head ;
callNextSessionOpened(head, session);
}

private void callNextSessionOpened(Entry entry, IoSession session) {
try {
IoFilter filter = entry.getFilter();
NextFilter nextFilter = entry.getNextFilter();
filter.sessionOpened(nextFilter, session);
} catch (Throwable e) {
fireExceptionCaught(e);
}
}




fireSessionOpened方法获取当前的头节点,然后调用callNextSessionOpened方法,而callNextSessionOpened方法是从entry中获取filter和nextfitler,触发filter的sessionOpened方法,同时将nextfilter作为参数传进去,而filter层如果对这个消息感兴趣可以处理完成后调用nextfilter的sessionOpened方法,不感兴趣的话,可能消息到此就结束了。


再回到上面EntryImpl中对NextFilter的实现,我们看到NextFilter收到sessionOpen消息,获取当前节点的下一个节点,然后触发IoFilterChain的callNextSessionOpened方法,即上图所示。再然后就是传递到下一节点处理,要么filter层拦截过滤结束,要不就是传到最后一层由TailFilter交给业务层处理。而写消息恰好相反,nextFilter是获取前一个节点,这就实现了双向过滤的功能。

到这我们就明白了为什么EntryImpl还有NextFilter选择内部类实现了。

NextFilter其实是起到中转站的作用,收到Reveceive消息转交给后一节点,收到Send消息转交给前一个消息。那我们再来想想为什么要用NextFilter来作为中转呢?我想应该是接口隔离的原则。Filter只需要关心如何处理接受到的消息,至于如何转交到下一个Filter不应该由他实现。

至此Mina的责任链实现就已经讲完了,其余细节的东西由读者自己体会吧,在此思考下

IoFilterChain是如何检测链表中是否有环呢?


<div>
    作者:qarkly112649 发表于2014/7/7 9:52:26 [原文链接](http://blog.csdn.net/qarkly112649/article/details/37498251)
</div>
<div>
阅读:631 评论:0 [查看评论](http://blog.csdn.net/qarkly112649/article/details/37498251#comments)
</div>

[原]快排的java两种实现方式

快排是最基础的几个排序算法之一,今天再来回顾下

public class QuickSort {

    public static void quickSort(int[] array){
        if(array != null){
            quickSort(array, 0, array.length-1);
        }
    }

    private static void quickSort(int[] array,int beg,int end){
        if(beg >= end || array == null)
            return;
        int p = partition(array, beg, end);
        quickSort(array, beg, p-1);
        quickSort(array, p+1, end);
    }
}

上面就是快排主要的框架,最重要就是partition方法,它是划分并找到下次分割排序的位置p

常用的partition方法

private static int partition(int[] array, int beg, int end) {
        int first = array[beg];
        int i = beg, j = end;
        while (i < j) {
            while (array[i] <= first && i < end) {
                i++;
            }
            while (array[j] > first && j >= beg) {
                j--;
            }
            if (i < j) {
                array[i] = array[i] ^ array[j];
                array[j] = array[i] ^ array[j];
                array[i] = array[i] ^ array[j];
            }
        }
        if (j != beg) {
            array[j] = array[beg] ^ array[j];
            array[beg] = array[beg] ^ array[j];
            array[j] = array[beg] ^ array[j];
        }
        return j;
    }

第二种partition方法实现

private static int partition(int[] array,int beg,int end){
        int last = array[end];
        int i = beg -1;
        for (int j = beg; j <= end-1; j++) {
            if(array[j] <= last){
                i++;
                if(i != j){
                    array[i] = array[i]^array[j];
                    array[j] = array[i]^array[j];
                    array[i] = array[i]^array[j];
                }
            }
        }
        if((i+1) != end){
            array[i+1] = array[i+1]^array[end];
            array[end] = array[i+1]^array[end];
            array[i+1] = array[i+1]^array[end];
        }
        return i+1;
    }

个人比较喜欢第二种写法,比较简洁

<div>
    作者:qarkly112649 发表于2014/6/29 17:24:20 [原文链接](http://blog.csdn.net/qarkly112649/article/details/35794097)
</div>
<div>
阅读:2000 评论:0 [查看评论](http://blog.csdn.net/qarkly112649/article/details/35794097#comments)
</div>

[原]红黑树的java实现

红黑树的原理以及算法见文章:http://blog.csdn.net/v_JULY_v/article/details/6105630

链接文章中已经把原理讲得很清楚了,本文只讨论java来实现红黑树,主要是链接文章中的算法实现,或者说是算法导论中的算法实现吧

红黑树的节点定义:


package org.algorithm.RedBlackTree;

/**
* Created with IntelliJ IDEA.
* User: qarkly
* Date: 14-6-8
* Time: 下午11:16
* To change this template use File | Settings | File Templates.
*/
public class RedBlackNode {
public static final byte RED = 0x00;
public static final byte BLACK = 0x01;
public static RedBlackNode NILL = new RedBlackNode(BLACK);

RedBlackNode parent;
RedBlackNode left;
RedBlackNode right;
Comparable element;
byte color;

public RedBlackNode(RedBlackNode left,RedBlackNode right,Comparable element){
this.left = left;
this.right = right;
this.element = element;
color = RED;
}

public RedBlackNode(byte color){
this.color = color;
}

public RedBlackNode(Comparable element,byte color){
this.color = color;
this.element = element;
this.left = NILL;
this.right = NILL;
}

}

左旋操作实现:

 /**
* 左旋操作
* @param node
*/
private void leftRotate(RedBlackNode node){
if(node == null || node == RedBlackNode.NILL)
return;
RedBlackNode pNode = node.right;
node.right = pNode.left;
pNode.left.parent = node;
pNode.parent = node.parent;
if(node.parent == RedBlackNode.NILL){
root = pNode;
}else if(node == node.parent.left){
node.parent.left = pNode;
}else {
node.parent.right = pNode;
}
pNode.left = node;
node.parent = pNode;

}

红黑树的插入实现,文章的BR_INSERT(T ,z):

 private void insert(RedBlackNode node){
RedBlackNode preNode = RedBlackNode.NILL;
RedBlackNode curNode = root;
while (curNode != RedBlackNode.NILL){
preNode = curNode;
if(node.element.compareTo(curNode.element) < 0){
curNode = curNode.left;
}else if(node.element.compareTo(curNode.element) >0){
curNode = curNode.right;
}else{
return;
}
}

node.parent = preNode;
if(preNode == RedBlackNode.NILL){
root = node;
}else if(node.element.compareTo(preNode.element) < 0){
preNode.left = node;
}else{
preNode.right = node;
}
node.left = node.right = RedBlackNode.NILL;
node.color = RedBlackNode.RED;
BrInsertFixup(node);

}

插入修复算法RB-INSERT-FIXUP现实:

 private void BrInsertFixup(RedBlackNode node){
RedBlackNode pNode;
while (node.parent.color == RedBlackNode.RED){
if(node.parent == node.parent.parent.left){
pNode = node.parent.parent.right;
if(pNode.color == RedBlackNode.RED){
node.parent.color = RedBlackNode.BLACK;
pNode.color = RedBlackNode.BLACK;
node.parent.parent.color = RedBlackNode.RED;
node = node.parent.parent;
}else {
if (node == node.parent.right){
node = node.parent;
leftRotate(node);
}
node.parent.color = RedBlackNode.BLACK;
node.parent.parent.color = RedBlackNode.RED;
rightRotate(node.parent.parent);
// node = node.parent;
}
}else{
//镜像情况处理
pNode = node.parent.parent.left;
if(pNode.color == RedBlackNode.RED){
node.parent.color = RedBlackNode.BLACK;
pNode.color = RedBlackNode.BLACK;
node.parent.parent.color = RedBlackNode.RED;
node = node.parent.parent;
}else {
if(node == node.parent.left){
node = node.parent;
rightRotate(node);
}
node.parent.color = RedBlackNode.BLACK;
node.parent.parent.color = RedBlackNode.RED;
leftRotate(node.parent.parent);
// node = node.parent;
}
}
}
root.color = RedBlackNode.BLACK;

}

红黑树的删除实现RB-DELETE(T, z)

 private RedBlackNode delete(RedBlackNode node){
if(node == null || node == RedBlackNode.NILL){
return RedBlackNode.NILL;
}

RedBlackNode pNode;

if (node.left == RedBlackNode.NILL || node.right == RedBlackNode.NILL){
pNode = node;
}else {
pNode = findSuccessor(node);
}

RedBlackNode nNode;
if(pNode.left != RedBlackNode.NILL){
nNode = pNode.left;
}else {
nNode = pNode.right;
}

nNode.parent = pNode.parent;
if(pNode.parent == RedBlackNode.NILL){
root = nNode;
}else{
if(pNode == pNode.parent.left){
pNode.parent.left = nNode;
}else{
pNode.parent.right = nNode;
}
}

if(pNode != node){
node.element = pNode.element;
}

if(pNode.color == RedBlackNode.BLACK ){
BrDeleteFixup(nNode);
}

return pNode;
}

删除后的修复和保持操作RB-DELETE-FIXUP(T, x) 

 private void BrDeleteFixup(RedBlackNode node){
if(node == null ){
return;
}
while (node != root && node.color == RedBlackNode.BLACK){
if(node == node.parent.left){
RedBlackNode bNode = node.parent.right;
if(bNode.color == RedBlackNode.RED){
bNode.color = RedBlackNode.BLACK;
node.parent.color = RedBlackNode.RED;
leftRotate(node.parent);
bNode = node.parent.right;
}
if(bNode.left.color == RedBlackNode.BLACK && bNode.right.color == RedBlackNode.BLACK){
bNode.color = RedBlackNode.RED;
node = node.parent;
continue;
}else if(bNode.right.color == RedBlackNode.BLACK){
bNode.left.color = RedBlackNode.BLACK;
bNode.color = RedBlackNode.RED;
rightRotate(bNode);
bNode = node.parent.left;
}
bNode.color = node.parent.color;
node.parent.color = RedBlackNode.BLACK;
bNode.right.color = RedBlackNode.BLACK;
leftRotate(node.parent);
node = root;

}else{
RedBlackNode bNode = node.parent.left;
if(bNode.color == RedBlackNode.RED){
bNode.color = RedBlackNode.BLACK;
node.parent.color = RedBlackNode.RED;
rightRotate(node.parent);
bNode = node.parent.left;
}
if(bNode.left.color == RedBlackNode.BLACK && bNode.right.color == RedBlackNode.BLACK){
bNode.color = RedBlackNode.RED;
node = node.parent;
continue;
}else if(bNode.left.color == RedBlackNode.BLACK){
bNode.right.color = RedBlackNode.BLACK;
bNode.color = RedBlackNode.RED;
leftRotate(bNode);
bNode = node.parent.right;
}
bNode.color = node.parent.color;
node.parent.color = RedBlackNode.BLACK;
bNode.left.color = RedBlackNode.BLACK;
rightRotate(node.parent);
node = root;
}
}
node.color = RedBlackNode.BLACK;

}

文中代码排版貌似不太好看,附上github的完整代码

https://gist.github.com/qarkly/4db9abf5c231f12e45d7

<div>
    作者:qarkly112649 发表于2014/6/29 15:28:37 [原文链接](http://blog.csdn.net/qarkly112649/article/details/35787283)
</div>
<div>
阅读:986 评论:0 [查看评论](http://blog.csdn.net/qarkly112649/article/details/35787283#comments)
</div>

[原]责任链模式

注:第一节 部分引自:http://www.cnblogs.com/kym/archive/2009/04/06/1430078.html

一、责任链模式(Chain of Responsiblity)

    定义: 使多个对象都有机会处理请求,从而避免请求的发送者和接受者之间的耦合关系。将这个对象连成一条链,并沿着这条链传递该请求,直到有一个对象处理他为止。

       结构实图:

> > > >

>
> 责任链模式涉及角色如下:
> 待处理(Client)角色:一个需要处理的请求,可以是消息或者其他
>
抽象处理者(Handler)角色:定义一个对外处理请求的接口。由它将请求分发到具体处理者( ConcreteHandler)角色进行逐次请求处理。抽象处理者(Handler)角色和具体处理者(ConcreteHandler)角色是聚合关系
> 具体处理者(ConcreteHandler)角色:具体处理者接受到请求时,对其感兴趣的消息处理,或者传递给下个具体处理者处理
二、简单责任链模式的实现

>
接口定义:
>
>
public interface Filter {
>
> public void messageReceived(String msgType,List<String> contents,NextFliter nextFliter);
>
> public static interface NextFliter{
>
> void doFilter(String msgType,List<String> contents);
>
> }
>
> public static interface FilterChain{
> public void addFirst(Filter filter);
> public void addLast(Filter filter);
> public void messageReceived(String msgType, List<String> contents);
> }
>
> }

>
>

>
>
接口详细说明:

>
>

>
>
>
Filter接口:对应责任链模式中的就是具体处理者(ConcreteHandler)角色,只有一个方法messageReceived(String msgType,List<String> contents,NextFliter nextFliter)。其中msgType和contents就是需要处理的请求消息,对应责任链中的待处理(Client)角色。nextFliter是调用下一个具体处理(ConcreteHandler)角色,这样做的好处就是Filter不必只有下一个Filter的引用。具体如何实现见下面的FilterChain的实现
> FilterChain接口:对应责任链模式中的抽象处理(Handler)者角色。该接口持有所有的Filter,addFirst(Filter filter)和addLast(Filter filter)方法就是向FilterChain中加入具体处理者(ConcreteHandler)角色。当请求消息到来时,FilterChain调用自身的messageReceived方法,将请求传给第一个Filter,Filter再通过自己的messageReceived方法对请求处理或者传递给下一个Filter
>
NextFliter接口:从字面意思即下一个Filter,但这个接口同样由FilterChain实现,此接口主要是通知FilterChain获取下一个Filter处理请求消息
>
> FilterChain具体实现:

>
>
public class DefaultFilterChain implements Filter.NextFliter,Filter.FilterChain {
> private LinkedList<Filter> fliters = new LinkedList<Filter>();
> private int index = 0;
>
> public DefaultFilterChain(){}
> public void addLast(Filter fliter) {
> if (fliter == null)
> return;
> fliters.addLast(fliter);
> }
> public void addFirst(Filter fliter) {
> if (fliter == null)
> return;
> fliters.addFirst(fliter);
> }
>
> public void messageReceived(String msgType, List<String> contents) {
> index = -1;
> doFilter(msgType, contents);
> }
>
> public void doFilter(String msgType, List<String> contents) {
> index++;
> if( index < fliters.size()){
> fliters.get( index).messageReceived(msgType, contents, this);
> }
> }
> }

>
>

>
>
FilterChain实现总结:

>
>
      优点


> > 1. Filters使用链表存储,实现简单。每个Filter不必引用下一个Filter,由FilterChain实现,这样Filter实现比较简单方便
> > 2. FilterChain同时实现NextFliter,每个Filter在调用下一个Filter时只需要调用nextFilter.doFilter(…)方法,这样做实现了接口隔离的效果
> > 不足
>
> > 1. 内部使用index,造成线程不安全,如果对messageReceived方法同步代价太高,不合适
> > 2. 虽然FilterChain和NextFilter起到接口隔离效果,但两个接口同时由DefaultFilterChain实现,代码耦合度太高,不利于拓展
> > 3. 对请求消息不够抽象,造成功能单一
> > 4. 只能单向过滤处理消息,不能反向
> >
> > 优化空间
> > 1. 不使用LinkedList,自己实现单向链表或者双向链表
> > 2. 使用双向链表实现双向过滤处理,既可以处理messageReceived消息,又可以处理messageSent消息
> > 3. FilterChain和NextFilter分开实现,降低耦合度
> > 有机会介绍下在Mina中责任链是如何实现的…
<div>
    作者:qarkly112649 发表于2014/6/19 9:38:58 [原文链接](http://blog.csdn.net/qarkly112649/article/details/32312037)
</div>
<div>
阅读:208 评论:0 [查看评论](http://blog.csdn.net/qarkly112649/article/details/32312037#comments)
</div>