學(xué)習(xí)socket nio 之 mina實例

1:mina之淺談

            mina是apache基于socket nio的一套框架,可以從apache官網(wǎng)下載jar包和源碼。試用起來非常方便,也非常強大。如果對socket nio還不是很了解的話:請看一下這兩篇文章

             學(xué)習(xí)bytebuffer和socket nio實例

            這里我簡單的介紹一下mina的框架:

            IoService:這個接口是網(wǎng)絡(luò)的入口,IoAcceptor和IoConnector都實現(xiàn)這個接口。從名字上我們可以看得出來IoAcceptor是接受鏈接的(服務(wù)端),而IoConnector是用來鏈接的(客戶端)。

            IoFilter:過濾器。他是用來過濾消息的。從IoService(網(wǎng)絡(luò)接口)出來的數(shù)據(jù)或者進入IoService(網(wǎng)絡(luò)接口)的數(shù)據(jù)都會經(jīng)過IoFilter的處理。最重要的就是日志和解碼和編碼。

           IoHandler:處理器。它是鏈接應(yīng)用和IoFilter的橋梁,是進行業(yè)務(wù)處理的,從IoFilter出來的數(shù)據(jù)會發(fā)到IoHandler中處理。

           從一個圖中我們來了解一下,這幾個接口之間的關(guān)系:

看著這張圖片,就應(yīng)該明白mina中的數(shù)據(jù)時怎么傳輸?shù)牧税伞?雌饋砗芎唵蔚臉幼樱?br> 2:mina實例
目標(biāo):

     不管客戶端發(fā)送什么數(shù)據(jù)到服務(wù)端,服務(wù)端口返回Hello world。
服務(wù)端實現(xiàn)
IoHandler:業(yè)務(wù)處理層

    import org.apache.mina.core.service.IoHandlerAdapter;
    import org.apache.mina.core.session.IdleStatus;
    import org.apache.mina.core.session.IoSession;
     
    public class ServerHandler extends IoHandlerAdapter{
        @Override
        public void exceptionCaught(IoSession session, Throwable cause) throws Exception{
            System.out.println("報錯啦............");
            cause.printStackTrace();
        }
        
        @Override
        public void messageReceived( IoSession session, Object message ) throws Exception{
            String str = message.toString();
            System.out.println("messageReceived:"+str);
            session.write("hello world");
        }
        
        @Override
        public void sessionIdle( IoSession session, IdleStatus status ) throws Exception{
            System.out.println( "IDLE " + session.getIdleCount( status ));
        }
    }

 
IoFilter:過濾器層

         這里我們做一個解碼的編碼的過濾層,這也是mina中最常用的。首先我們需要定義屬于我們自己的協(xié)議,也就是數(shù)據(jù)包的格式:別以為這很復(fù)雜,其實很簡單的。

        我們知道數(shù)據(jù)都是字節(jié)類型的,那么我們的協(xié)議格式如下:前兩位表示數(shù)據(jù)包的長度(一個short類型正好兩個字節(jié)),第三位是閑置位,后面的是數(shù)據(jù)。長度是閑置位和

數(shù)據(jù)長度的和。這樣我們就可以根據(jù)前兩位確定,我們的數(shù)據(jù)包到那里結(jié)束。那么我們循環(huán)這么讀,就會取得所有的數(shù)據(jù)包。是不是很簡單啊,這個格式就是我們的協(xié)議。

      為了更簡單,這里我們客戶端發(fā)往服務(wù)端的數(shù)據(jù)進行編碼和解碼,服務(wù)端發(fā)往客戶端的就不編碼了,客戶端也就不用解碼。服務(wù)端使用mina,客戶端我們就使用基本的socket nio。

編碼工廠類:

    public class CodecFactory extends DemuxingProtocolCodecFactory{
        public CodecFactory(){
            super.addMessageEncoder(String.class, Encoder.class);
            super.addMessageDecoder(Decoder.class);
        }
    }

解碼類:

    import java.util.ArrayList;
    import java.util.List;
     
    import org.apache.mina.core.buffer.IoBuffer;
    import org.apache.mina.core.session.IoSession;
    import org.apache.mina.filter.codec.ProtocolDecoderOutput;
    import org.apache.mina.filter.codec.demux.MessageDecoder;
    import org.apache.mina.filter.codec.demux.MessageDecoderResult;
    public class Decoder implements MessageDecoder {
        
        private byte[] r_curPkg = null;
        private int r_pos = -1; // 包計數(shù)器
        static private final int PKG_SIZE_BYTES = 2;//包長度
        
        public Decoder() { }
     
        @Override
        public MessageDecoderResult decodable(IoSession session, IoBuffer in) {
            return MessageDecoderResult.OK;
        }
        
        @Override
        public MessageDecoderResult decode(IoSession session, IoBuffer in,ProtocolDecoderOutput out) throws Exception {
            List<String> list = new ArrayList<String>();
            while (in.remaining() >= PKG_SIZE_BYTES || (r_pos >= 0 && in.hasRemaining())) {// 循環(huán)接收包,4為一個整型,表示包長度b, 如果上一個包未接收完成時,繼續(xù)接收
                // 如果上個包已收完整,則創(chuàng)建新的包
                if (r_pos == -1) {
                    //得到下一個包的長度,長度不包括前兩位,即包的長度=壓縮位長度+數(shù)據(jù)位長度
                    int pkgLen = in.getShort();
                    
                    //如果包長度小于0,那么此包錯誤,解碼失敗,返回。
                    if (pkgLen < 0) {
                        return MessageDecoderResult.NOT_OK;
                    }
                    in.get();
                    r_curPkg = new byte[pkgLen-1]; //數(shù)組長度為數(shù)據(jù)長度
                    r_pos = 0;
                }
                int need = r_curPkg.length - r_pos; //需要讀取的數(shù)據(jù)長度
                int length = in.remaining();//緩沖區(qū)中可讀的數(shù)據(jù)長度
                if (length >= need) {// 可以把當(dāng)前包讀完整
                    in.get(r_curPkg, r_pos, need); // 復(fù)制緩沖區(qū)中的數(shù)據(jù)到r_curPkg中
                    // 處理接收到一個完整的包數(shù)據(jù)后,把包添加到池中,判斷是否需要需要解壓
                    byte[] data = r_curPkg;
                    String str = new String(data);
                    list.add(str);
                    r_curPkg = null;
                    r_pos = -1;
                } else {
                    // 如果剩下的字節(jié)數(shù),不夠一個包則
                    int remainBytes = in.remaining();
                    in.get(r_curPkg, r_pos, remainBytes);
                    r_pos += remainBytes;
                    return MessageDecoderResult.NEED_DATA;
                }
            }
            for (String protocol : list) {
                out.write(protocol);
            }
            return MessageDecoderResult.OK;
        }
     
        @Override
        public void finishDecode(IoSession session, ProtocolDecoderOutput out) {
     
        }
        
        
    }

編碼類:(沒有進行編碼,只進行了數(shù)據(jù)發(fā)送)

    import org.apache.mina.core.buffer.IoBuffer;
    import org.apache.mina.core.session.IoSession;
    import org.apache.mina.filter.codec.ProtocolEncoderOutput;
    import org.apache.mina.filter.codec.demux.MessageEncoder;
    public class Encoder implements MessageEncoder<String>{
     
        public Encoder(){
            
        }
     
        @Override
        public void encode(IoSession session, String message, ProtocolEncoderOutput out)
                throws Exception {
                System.out.println("encode..................");
                String value = (String) message;  
                IoBuffer buf = IoBuffer.allocate(value.getBytes().length);  
                buf.setAutoExpand(true);  
                if (value != null){
                    buf.put(value.trim().getBytes());  
                }  
                buf.flip();  
                out.write(buf);
                out.flush();
        }
    }

IoService層:

    import java.io.IOException;
    import java.net.InetSocketAddress;
     
    import org.apache.mina.core.service.IoAcceptor;
    import org.apache.mina.core.session.IdleStatus;
    import org.apache.mina.filter.codec.ProtocolCodecFilter;
    import org.apache.mina.filter.logging.LoggingFilter;
    import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
     
    public class MinaServer {
     
        private static final int PORT = 9123;
        
        public static void main(String [] args) throws IOException{
            
            IoAcceptor acceptor = new NioSocketAcceptor();
            acceptor.getFilterChain().addLast( "logger", new LoggingFilter() );
                    acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter(new CodecFactory()));
            acceptor.setHandler(new ServerHandler());
            
            
            acceptor.getSessionConfig().setReadBufferSize( 3 );
                    acceptor.getSessionConfig().setIdleTime( IdleStatus.BOTH_IDLE, 10 );
            acceptor.bind( new InetSocketAddress(PORT) );
        }
    }

到這里我們的服務(wù)端代碼就寫完了,
客戶端實現(xiàn)

    <span style="font-size:12px">public class SocketClient {
     
        public static void main(String...args)throws Exception{
            SocketChannel socketChannel = SocketChannel.open();
            socketChannel.connect(new InetSocketAddress("localhost",9123));
            byte [] bytes = "aaaa".getBytes();
            
            //對數(shù)據(jù)包進行編碼
            ByteBuffer buffer = ByteBuffer.allocate(bytes.length+3);
            buffer.putShort((short)(bytes.length+1)); //包長度
            buffer.put((byte)1);//閑置位
            buffer.put(bytes);//數(shù)據(jù)
            buffer.flip();
            socketChannel.write(buffer);
            socketChannel.socket().shutdownOutput();
            
            String obj = receive(socketChannel);
            System.out.println(obj);
        }
        
        private static String receive(SocketChannel socketChannel)throws Exception{
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            int size = 0;
            byte [] bytes = null;
            while((size = socketChannel.read(buffer))>=0){
                buffer.flip();
                bytes = new byte[size];
                buffer.get(bytes);
                baos.write(bytes);
                buffer.clear();
            }
            bytes = baos.toByteArray();
            baos.close();
            return new String(bytes);
        }
    }
    </span>

所有的代碼都寫完了,先啟動服務(wù)端的MinaServer,然后再啟動客戶端,我們就會看到結(jié)果。

作者:chen.yu
深信服三年半工作經(jīng)驗,目前就職游戲廠商,希望能和大家交流和學(xué)習(xí),
微信公眾號:編程入門到禿頭 或掃描下面二維碼
零基礎(chǔ)入門進階人工智能(鏈接)