MINA2实用手册
- 格式:docx
- 大小:48.26 KB
- 文档页数:17
MINA2实用手册
作者:李庆丰
Email:scholers@
MINA框架是对java的NIO包的一个封装,简化了NIO程序开发的难度,
封装了很多底层的细节,然开发者把精力集中到业务逻辑上来,最近做了一
个相关的项目,为了备忘对MINA做一个总结。
一、服务端初始化及参数配置
MINA2初始化很简单。
基本的初始化参数如下:
//初始化Acceptor—可以不指定线程数量,MINA2里面默认是CPU数量+2 NioSocketAcceptor acceptor = new NioSocketAcceptor(5);
java.util.concurrent.Executor threadPool =
Executors.newFixedThreadPool(1500);//建立线程池
//加入过滤器(Filter)到Acceptor
acceptor.getFilterChain().addLast("exector", new
ExecutorFilter(threadPool));
//编码解码器
acceptor.getFilterChain().addLast("codec",
new ProtocolCodecFilter(new WebDecoder(),new
XmlEncoder()));
//日志
LoggingFilter filter = new LoggingFilter();
filter.setExceptionCaughtLogLevel(LogLevel.DEBUG);
filter.setMessageReceivedLogLevel(LogLevel.DEBUG);
filter.setMessageSentLogLevel(LogLevel.DEBUG);
filter.setSessionClosedLogLevel(LogLevel.DEBUG);
filter.setSessionCreatedLogLevel(LogLevel.DEBUG);
filter.setSessionIdleLogLevel(LogLevel.DEBUG);
filter.setSessionOpenedLogLevel(LogLevel.DEBUG);
acceptor.getFilterChain().addLast("logger", filter);
acceptor.setReuseAddress(true);//设置的是主服务监听的端口可以重
用
acceptor.getSessionConfig().setReuseAddress(true);//设置每一个
非主监听连接的端口可以重用
MINA2中,当启动一个服务端的时候,要设定初始化缓冲区的长度,如果不设置
这个值,系统默认为2048,当客户端发过来的消息超过设定值的时候,MINA2的
机制是分段接受的,将字符是放入缓冲区中读取,所以在读取消息的时候,需要
判断有多少次。
这样的好处就是可以节省通讯的流量。
acceptor.getSessionConfig().setReceiveBufferSize(1024);//设置输入缓
冲区的大小
acceptor.getSessionConfig().setSendBufferSize(10240);//设置输出缓
冲区的大小
//设置为非延迟发送,为true则不组装成大包发送,收到东西马上发出
acceptor.getSessionConfig().setTcpNoDelay(true);
//设置主服务监听端口的监听队列的最大值为100,如果当前已经有100个连
接,再新的连接来将被服务器拒绝
acceptor.setBacklog(100);
acceptor.setDefaultLocalAddress(new
InetSocketAddress(port));
//加入处理器(Handler)到Acceptor
acceptor.setHandler(new YourHandler());
acceptor.bind();
}
二、初始化客户端
客户端的初始化和服务器端其实是一样的,就是初始化类不一样,客户端是作为发
送者的
SocketConnector connector = new NioSocketConnector();
connector.getFilterChain().addLast(
"codec",
new ProtocolCodecFilter(new
XmlCodecFactory(Charset
1..forName(charsetName), null, sertType)));
//指定线程池
connector.getFilterChain().addLast("executor", new、、
ExecutorFilter());
//指定业务处理类
connector.setHandler(this);
三、处理流程
NioSocketAcceptor是MINA的适配器,一切都是从这里开始的。
MINA中有
个过滤器和处理器的概念,过滤器用来过滤数据,处理器用来处理数据。
具体来
说MINA的处理模型就是request->过滤器A->过滤器B->处理器->过滤器
B->过滤器A->response,这里的request和response类似serlvet的
request和response。
acceptor.getFilterChain().addLast("codec",
new ProtocolCodecFilter(new WebDecoder(),new
XmlEncoder()));
//request->WebDecoder->XmlHander->WebEncode->response
acceptor.getFilterChain().addLast("codec",
new ProtocolCodecFilter(new WebDecoder(),new XmlEncoder()));
//这里是处理逻辑的关键部位,请求的处理都是在WebDecoder类和
XmlEncoder类中处理,可以明显从命名上看出来一个是用来解码,另一个是用
来编码,requet过来后先进入WebDecoder类(实现了ProtocolDecoder接口)
进行解码处理,这里可以加入自己的逻辑把传进来的流解码成自己需要的信息。
而
XmlEncoder类(实现了ProtocolEncoder接口)是进行编码,在这个类里面加入
自己的逻辑把处理后的信息组装发送给客户端(response)。
而在解码和编码过程
中XmlHander(扩展了IoHandlerAdapter抽象类)起到了处理器的作用。
现在详细描述一下
request->WebDecoder->XmlHander->WebEncode->response的过程:
客户端发送一个请求到MINA服务器,这里相当于来了一个requet。
请求首先来
到WebDecoder类(实现了ProtocolDecoder接口)中的
boolean decode(IoSession session, IoBuffer in, ProtocolDecoderOutput
out) throws Exception{}方法
/*
参数in:用户请求信息全存在这里,读数据就从in这里读。
参数out:用来输出处理后的数据到Filter的下一个过滤器,如果没有过滤器了就
输出到XmlHander,这里有点和servelt的过滤器类似。
利用out.write(Object
object);这个函数可以把数据传到下一个Filter。
我们可以自己定义一个对象,我们假设为Request,用它来传递消息,那末这里就可以写成out.write(new RequsetMessage()); 如果这个方法返回false,就是说当前逻辑包还没接收完(也就是当前的IoBuffer并没有包含足够的数据),需要再次执行decode方法(再次获取新的IoBuffer),用来获取足够的数据。
如果返回值为true就表示可以不执行decode方法了,但是要激活handler方法,必须要调用out.write 方法。
public class RequestMessage{}//这里什么也不做
*/
*/
然后到XmlHander(扩展了IoHandlerAdapter抽象类)中的
void messageReceived(IoSession session, Object message) throws Exception{}方法
WriteFuture future = session.write(response);//session中必须加入这个代码,才会激活encode方法
future.addListener(IoFutureListener.CLOSE);//这个的作用是发送完毕后关闭连接,加了就是短连接,不然是长连接 ;
在XmlHanler类中可以在重载sessionIdle方法,这个方法判断整个SOCKET 连接通道是否空闲,可以再这里间隔(在服务店启动的时候设置idleTime)发送心跳包来保持各个长连接:
/**
*当网络通道空闲时此方法被调用,在这里可以判断是读空闲、写空闲还是两个都空闲,以便做出正确的处理
一般的网络通讯程序都要与服务器端保持长连接,所以这里可以发一下网络测试数据以保持与服务器端的连接
*@param session会话信息
*@param status状态
*@throws Exception异常
*/
@Override
public void sessionIdle(IoSession session, IdleStatus status) throws Exception
IoFutureListener里面有个operationComplete(IoFuture future)方法,当流发送完成之后才调用这个方法。
/*
参数message:用来获取Filter传递过来的对象.对应代码RequestMessage request = (RequestMessage) message;
参数session:用来发送数据到Filter.对应代码session.write(new
ResponseMessage());
public class ResponseMessage{}//这里什么也不做,假设存放处理后的数据
注意:对于一个MINA程序而言,对于XmlHander类只生成一个对象,所以要
考虑线程安全问题
*/
然后到
XmlEncoder类(实现了ProtocolEncoder接口)中的
boolean encode(IoSession session, Object message,
ProtocolEncoderOutput out) throws Exception{}
方法
/*
参数message:用来获取上一个Filter节点的数据或者处理器的数据(如果这个
过滤器为最靠近处理器的那个)
ResponseMessage response = (ResponseMessage)message;
参数out:用来输出数据到下一个Filter节点过或者到客户端,用
out.write(Object encodedMessage)把数据发送
出去,但是要注意的是,如果这个Filter下一个节点如果是客户端的话,那个这个
encodedMessage数据必须为
IoBuffer类型的,可以利用IoBuffer.wrap(byte[] byteArray)这个方法来格式
化输出数据
*/
四、大容量包的处理
MINA2中(MINA2 RC版本,MINA2.0正式版已经发布)服务端接受数据默认有一定长度的缓冲区(可以在启动的时候设置)。
那么对于大报文,怎么处理呢?比如说超过1024,甚至更多?MINA2为了节省网络流量,提高处理效率,会将大报文自动拆分(可能是存放MINA2中的缓冲区里面):比如2048字节的报文,就会拆分成两次;那么在接受的时候,就有一个如何判断是完整报文的问题,或者说是一个拆包组包的问题。
MINA2中初始化服务的时候是可以设置输入和输出的缓冲区的:
acceptor.getSessionConfig().setReadBufferSize(1024);
MINA2提供的案例是,在IoSession中设置一个类似于session,存在在当前
IoSession中的全局变量,在此IoSession中有效。
private final AttributeKey TEST = new AttributeKey(getClass(), "TES
T");
大家都知道,通过 SOCKET TCP/IP传输过来的报文是不知道边界的,所以一般会约定在前端固定长度的字节加上报文长度,让SERVER来根据这个长度来确定整个报文的边界,在我前面的博文有提到。
其实MINA2中有:
prefixedDataAvailable(4) int
方法,来判断固定长度的报文长度,但是参数只能是1,2,4;该方法很好用。
判断前四字节的整型值是否大于等于整个缓冲区的数据。
可以方便的判断一次
messageReceived 过来的数据是否完整。
(前提是自己设计的网络通讯协议前四字节等于发送数据的长度),如果你不是设定1,2,4字节来作为长度的话,那么就没辙了。
在你的解码操作中,MINA2的缓冲区发多少次报文,你的decode方法就会调用多少次。
上面设置了session之后,可以采用一个方法:
/**
*
* @param session
* 会话信息
* @return 返回session中的累积
*/
private Context getContext(IoSession session) {
Context ctx = (Context) session.getAttribute(CONTEXT);
if (ctx == null) {
ctx = new Context();
session.setAttribute(CONTEXT, ctx);
}
return ctx;
}
然后在你的decode方法中,首先从session取出数据对象,进行拼接:Context ctx = getContext(session);
// 先把当前buffer中的数据追加到Context的buffer当中
ctx.append(ioBuffer);
// 把position指向0位置,把limit指向原来的position位置
IoBuffer buf = ctx.getBuffer();
buf.flip();
接着读取每次报文的总长度:
// 读取消息头部分
byte[] bLeng = new byte[packHeadLength];
buf.get(bLeng);
int length = -1;
try {
length = Integer.parseInt(new String(bLeng));
} catch (NumberFormatException ex) {
ex.printStackTrace();
}
if (length > 0) {
ctx.setMsgLength(length);
}
在读取到每次报文的长度之后,就接着循环判断BUF里面的字节数据是否已经全部接受完毕了,如果没有接受完毕,那么就不处理;下面是完整处理的代码:
while (buf.remaining() >= packHeadLength) {
buf.mark();
// 设置总长度
if (ctx.getMsgLength() <= 0) {
// 读取消息头部分
byte[] bLeng = new byte[packHeadLength];
buf.get(bLeng);
int length = -1;
try {
length = Integer.parseInt(new String(bLeng));
} catch (NumberFormatException ex) {
ex.printStackTrace();
}
if (length > 0) {
ctx.setMsgLength(length);
}
}
// 读取消息头部分
int length = ctx.getMsgLength();
// 检查读取的包头是否正常,不正常的话清空buffer
if (length < 0) { // || length > maxPackLength2) {
buf.clear();
out.write("ERROR!");
break;
// 读取正常的消息包,并写入输出流中,以便IoHandler进行处理
} else if (length > packHeadLength && buf.remaining() >= length) {
//完整的数据读取之后,就可以开始做你自己想做的操作
了
} else {
// 如果消息包不完整
// 将指针重新移动消息头的起始位置
buf.reset();
break;
}
}
if (buf.hasRemaining()) { // 如果有剩余的数据,则放入Session中
// 将数据移到buffer的最前面
IoBuffer temp = IoBuffer.allocate(2048).setAutoExpand(
true);
temp.put(buf);
temp.flip();
buf.clear();
buf.put(temp);
} else { // 如果数据已经处理完毕,进行清空
buf.clear();
}
为了便于操作,最好设置一个内部类:
private class Context {
private final CharsetDecoder decoder;
private IoBuffer buf;
private int msgLength = 0;
private int overflowPosition = 0;
/**
*
*
*/
private Context() {
decoder = charset.newDecoder();
buf = IoBuffer.allocate(80).setAutoExpand(true); }
/**
*
*
* @return CharsetDecoder
*/
public CharsetDecoder getDecoder() { return decoder;
}
/**
*
*
* @return IoBuffer
*/
public IoBuffer getBuffer() {
return buf;
}
/**
*
*
* @return overflowPosition
*/
public int getOverflowPosition() { return overflowPosition;
}
/**
*
*
* @return matchCount
*/
public int getMsgLength() {
return msgLength;
}
/**
*
*
* @param matchCount
* 报文长度
*/
public void setMsgLength(int msgLength) { this.msgLength = msgLength;
}
/**
*
*
*/
public void reset() {
this.buf.clear();
this.overflowPosition = 0;
this.msgLength = 0;
this.decoder.reset();
}
/**
*
* @param in
* 输入流
*/
public void append(IoBuffer in) {
getBuffer().put(in);
}
}
五多个SOCKET通讯的处理
在MINA2中两个SOCKET SERVER进行通讯,可以采用虚拟机内部的管道的方式。
在MINA2的源码包里面自带了这个例子:
IoAcceptor acceptor = new VmPipeAcceptor();
VmPipeAddress address = new VmPipeAddress(8080);
// Set up server
acceptor.setHandler(new TennisPlayer());
acceptor.bind(address);
// Connect to the server.
VmPipeConnector connector = new VmPipeConnector();
connector.setHandler(new TennisPlayer());
ConnectFuture future = connector.connect(address);
future.awaitUninterruptibly();
IoSession session = future.getSession();
// Send the first ping message
session.write(new TennisBall(10));
// Wait until the match ends.
session.getCloseFuture().awaitUninterruptibly();
acceptor.unbind();
也可以将IoSession对方放入全局的线程安全的Map中去,当需要发送的时候根据
KEY取出来,然后write出去。
六MIN2的BUG
我们知道,在MINA2中,发送和接受时两个独立的工作线程,但是可以设置一个参
数,当服务端发送消息之后同步读取客户端的返回:
session.getConfig().setUseReadOperation(true);
近日,采用MINA2(RC)的同步读取方法,发现无法真的同步读取客户端的返回;
场景是:服务端发送一个消息给客户端,需要同步等待客户端的一个消息回执,然后服务端的程序继续执行;但是实际在使用的时候这个设置无效。
sendSession.getConfig().setUseReadOperation(true);
WriteFuture future = sendSession.write(xmlMsgBean); // 发送数据
future.awaitUninterruptibly(); // 等待发送数据操作完成
if (future.getException() != null) {
throw new AppException(future.getException().getMessage());
}
if (future.isWritten()) {
// 数据已经被成功发送
logger.debug("数据已经被成功发送");
ReadFuture readFuture = sendSession.read();
readFuture.awaitUninterruptibly();
if (readFuture.getException() != null) {
throw new AppException(readFuture.getException().getMessage()); }
sendSession.getConfig().setUseReadOperation(false);
return ((XmlMsgBean) readFuture.getMessage()).getStrErrMsg();
} else {
// 数据发送失败
logger.debug("数据发送失败");
}
后来用GOOGLE搜索了一下,发现在MINA的官网上,老外同样问了一个一模一样的问题,并且提了一个BUG上去,但是目前BUG的状态还是open;
https:///jira/browse/DIRMINA-777
I'm attempting to perform a synchronous write/read in a demux-based client application with MINA 2.0 RC1, but it seems to get stuck. Here is my code:
{code}
public boolean login(final String username, final String password) {
// block inbound messages
session.getConfig().setUseReadOperation(true);
// send the login request
final LoginRequest loginRequest = new LoginRequest(username, password); final WriteFuture writeFuture = session.write(loginRequest);
writeFuture.awaitUninterruptibly();
if (writeFuture.getException() != null) {
session.getConfig().setUseReadOperation(false);
return false;
}
// retrieve the login response
final ReadFuture readFuture = session.read();
readFuture.awaitUninterruptibly();
if (readFuture.getException() != null) {
session.getConfig().setUseReadOperation(false);
return false;
}
// stop blocking inbound messages
session.getConfig().setUseReadOperation(false);
// determine if the login info provided was valid
final LoginResponse loginResponse = (LoginResponse)readFuture.getMessage(); return loginResponse.getSuccess();
}
{code}
I can see on the server side that the LoginRequest object is retrieved, and a L oginResponse message is sent. On the client side, the DemuxingProtocolCodecFact ory receives the response, but after throwing in some logging, I can see that t he client gets stuck on the call to `readFuture.awaitUninterruptibly() `.
I can't for the life of me figure out why it is stuck here based upon my own co de. I properly set the read operation to true on the session config, meaning th at messages should be blocked. However, it seems as if the message no longer ex ists by time I try to read response messages synchronously.
Key: DIRMINA-777
Type: Bug
Status: Open
Priority: Blocker
Assignee: Unassigned
Reporter: Matt Huggins
Votes: 0
Watchers: 0。