學(xué)習(xí)socket nio 之 mina實例
1:mina之淺談
mina是apache基于socket nio的一套框架,可以從apache官網(wǎng)下載jar包和源碼。試用起來非常方便,也非常強大。如果對socket nio還不是很了解的話:請看一下這兩篇文章
學(xué)習(xí)bytebuffer和socket nio實例
這里我簡單的介紹一下mina的框架:
IoService:這個接口是網(wǎng)絡(luò)的入口,IoAcceptor和IoConnector都實現(xiàn)這個接口。從名字上我們可以看得出來IoAcceptor是接受鏈接的(服務(wù)端),而IoConnector是用來鏈接的(客戶端)。
IoFilter:過濾器。他是用來過濾消息的。從IoService(網(wǎng)絡(luò)接口)出來的數(shù)據(jù)或者進入IoService(網(wǎng)絡(luò)接口)的數(shù)據(jù)都會經(jīng)過IoFilter的處理。最重要的就是日志和解碼和編碼。
IoHandler:處理器。它是鏈接應(yīng)用和IoFilter的橋梁,是進行業(yè)務(wù)處理的,從IoFilter出來的數(shù)據(jù)會發(fā)到IoHandler中處理。
從一個圖中我們來了解一下,這幾個接口之間的關(guān)系:
看著這張圖片,就應(yīng)該明白mina中的數(shù)據(jù)時怎么傳輸?shù)牧税伞?雌饋砗芎唵蔚臉幼樱?br> 2:mina實例
目標(biāo):
不管客戶端發(fā)送什么數(shù)據(jù)到服務(wù)端,服務(wù)端口返回Hello world。
服務(wù)端實現(xiàn)
IoHandler:業(yè)務(wù)處理層
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
public class ServerHandler extends IoHandlerAdapter{
@Override
public void exceptionCaught(IoSession session, Throwable cause) throws Exception{
System.out.println("報錯啦............");
cause.printStackTrace();
}
@Override
public void messageReceived( IoSession session, Object message ) throws Exception{
String str = message.toString();
System.out.println("messageReceived:"+str);
session.write("hello world");
}
@Override
public void sessionIdle( IoSession session, IdleStatus status ) throws Exception{
System.out.println( "IDLE " + session.getIdleCount( status ));
}
}
IoFilter:過濾器層
這里我們做一個解碼的編碼的過濾層,這也是mina中最常用的。首先我們需要定義屬于我們自己的協(xié)議,也就是數(shù)據(jù)包的格式:別以為這很復(fù)雜,其實很簡單的。
我們知道數(shù)據(jù)都是字節(jié)類型的,那么我們的協(xié)議格式如下:前兩位表示數(shù)據(jù)包的長度(一個short類型正好兩個字節(jié)),第三位是閑置位,后面的是數(shù)據(jù)。長度是閑置位和
數(shù)據(jù)長度的和。這樣我們就可以根據(jù)前兩位確定,我們的數(shù)據(jù)包到那里結(jié)束。那么我們循環(huán)這么讀,就會取得所有的數(shù)據(jù)包。是不是很簡單啊,這個格式就是我們的協(xié)議。
為了更簡單,這里我們客戶端發(fā)往服務(wù)端的數(shù)據(jù)進行編碼和解碼,服務(wù)端發(fā)往客戶端的就不編碼了,客戶端也就不用解碼。服務(wù)端使用mina,客戶端我們就使用基本的socket nio。
編碼工廠類:
public class CodecFactory extends DemuxingProtocolCodecFactory{
public CodecFactory(){
super.addMessageEncoder(String.class, Encoder.class);
super.addMessageDecoder(Decoder.class);
}
}
解碼類:
import java.util.ArrayList;
import java.util.List;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
import org.apache.mina.filter.codec.demux.MessageDecoder;
import org.apache.mina.filter.codec.demux.MessageDecoderResult;
public class Decoder implements MessageDecoder {
private byte[] r_curPkg = null;
private int r_pos = -1; // 包計數(shù)器
static private final int PKG_SIZE_BYTES = 2;//包長度
public Decoder() { }
@Override
public MessageDecoderResult decodable(IoSession session, IoBuffer in) {
return MessageDecoderResult.OK;
}
@Override
public MessageDecoderResult decode(IoSession session, IoBuffer in,ProtocolDecoderOutput out) throws Exception {
List<String> list = new ArrayList<String>();
while (in.remaining() >= PKG_SIZE_BYTES || (r_pos >= 0
&& in.hasRemaining())) {// 循環(huán)接收包,4為一個整型,表示包長度b,
如果上一個包未接收完成時,繼續(xù)接收
// 如果上個包已收完整,則創(chuàng)建新的包
if (r_pos == -1) {
//得到下一個包的長度,長度不包括前兩位,即包的長度=壓縮位長度+數(shù)據(jù)位長度
int pkgLen = in.getShort();
//如果包長度小于0,那么此包錯誤,解碼失敗,返回。
if (pkgLen < 0) {
return MessageDecoderResult.NOT_OK;
}
in.get();
r_curPkg = new byte[pkgLen-1]; //數(shù)組長度為數(shù)據(jù)長度
r_pos = 0;
}
int need = r_curPkg.length - r_pos; //需要讀取的數(shù)據(jù)長度
int length = in.remaining();//緩沖區(qū)中可讀的數(shù)據(jù)長度
if (length >= need) {// 可以把當(dāng)前包讀完整
in.get(r_curPkg, r_pos, need); // 復(fù)制緩沖區(qū)中的數(shù)據(jù)到r_curPkg中
// 處理接收到一個完整的包數(shù)據(jù)后,把包添加到池中,判斷是否需要需要解壓
byte[] data = r_curPkg;
String str = new String(data);
list.add(str);
r_curPkg = null;
r_pos = -1;
} else {
// 如果剩下的字節(jié)數(shù),不夠一個包則
int remainBytes = in.remaining();
in.get(r_curPkg, r_pos, remainBytes);
r_pos += remainBytes;
return MessageDecoderResult.NEED_DATA;
}
}
for (String protocol : list) {
out.write(protocol);
}
return MessageDecoderResult.OK;
}
@Override
public void finishDecode(IoSession session, ProtocolDecoderOutput out) {
}
}
編碼類:(沒有進行編碼,只進行了數(shù)據(jù)發(fā)送)
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolEncoderOutput;
import org.apache.mina.filter.codec.demux.MessageEncoder;
public class Encoder implements MessageEncoder<String>{
public Encoder(){
}
@Override
public void encode(IoSession session, String message, ProtocolEncoderOutput out)
throws Exception {
System.out.println("encode..................");
String value = (String) message;
IoBuffer buf = IoBuffer.allocate(value.getBytes().length);
buf.setAutoExpand(true);
if (value != null){
buf.put(value.trim().getBytes());
}
buf.flip();
out.write(buf);
out.flush();
}
}
IoService層:
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
public class MinaServer {
private static final int PORT = 9123;
public static void main(String [] args) throws IOException{
IoAcceptor acceptor = new NioSocketAcceptor();
acceptor.getFilterChain().addLast( "logger", new LoggingFilter() );
acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter(new CodecFactory()));
acceptor.setHandler(new ServerHandler());
acceptor.getSessionConfig().setReadBufferSize( 3 );
acceptor.getSessionConfig().setIdleTime( IdleStatus.BOTH_IDLE, 10 );
acceptor.bind( new InetSocketAddress(PORT) );
}
}
到這里我們的服務(wù)端代碼就寫完了,
客戶端實現(xiàn)
<span style="font-size:12px">public class SocketClient {
public static void main(String...args)throws Exception{
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("localhost",9123));
byte [] bytes = "aaaa".getBytes();
//對數(shù)據(jù)包進行編碼
ByteBuffer buffer = ByteBuffer.allocate(bytes.length+3);
buffer.putShort((short)(bytes.length+1)); //包長度
buffer.put((byte)1);//閑置位
buffer.put(bytes);//數(shù)據(jù)
buffer.flip();
socketChannel.write(buffer);
socketChannel.socket().shutdownOutput();
String obj = receive(socketChannel);
System.out.println(obj);
}
private static String receive(SocketChannel socketChannel)throws Exception{
ByteBuffer buffer = ByteBuffer.allocate(1024);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
int size = 0;
byte [] bytes = null;
while((size = socketChannel.read(buffer))>=0){
buffer.flip();
bytes = new byte[size];
buffer.get(bytes);
baos.write(bytes);
buffer.clear();
}
bytes = baos.toByteArray();
baos.close();
return new String(bytes);
}
}
</span>
所有的代碼都寫完了,先啟動服務(wù)端的MinaServer,然后再啟動客戶端,我們就會看到結(jié)果。
作者:chen.yu
深信服三年半工作經(jīng)驗,目前就職游戲廠商,希望能和大家交流和學(xué)習(xí),
微信公眾號:編程入門到禿頭 或掃描下面二維碼
零基礎(chǔ)入門進階人工智能(鏈接)