2015-01-27 4 views
0

나는 SocketChannel에 와 TCP에 의해 서버에 연결하는 응용 프로그램을 작성하지만 두 가지 문제가 있습니다 첫 번째는 미성년자SocketChannel에이 문제

  1. 을 - 때때로 어떤 알 수없는 이유로 내가 연결된 메시지와
  2. 두 번째를 보내 결정적입니다 - 주기적으로 앱에서 메시지 송수신을 중지합니다.

어떤 생각이 잘못 되었습니까?

import java.io.IOException; 
import java.nio.ByteBuffer; 
import java.nio.channels.SelectionKey; 
import java.nio.channels.Selector; 
import java.nio.channels.SocketChannel; 
import java.nio.charset.Charset; 
import java.nio.charset.CharsetDecoder; 
import java.util.List; 
import java.util.Queue; 
import java.util.Set; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 

import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

public class SocketSelectorWorker extends Thread { 
    private static final transient Logger log = LoggerFactory.getLogger(SocketSelectorWorker.class); 
    private ExecutorService executorService = Executors.newFixedThreadPool(3); 
    private final Queue<byte[]> messages; 
    private Selector selector; 

    public SocketSelectorWorker(Queue messages, Selector selector) { 
    super(); 
    this.selector = selector; 
    this.session = session; 
    this.messages = messages; 
    } 

    @Override 
    public void run() { 
    super.run(); 
    while (isConnectionAlive()) { 
     try { 
     // Wait for an event 
     selector.select(); 
     } catch (IOException e) { 
     log.error("Selector error: {}", e.toString()); 
     log.debug("Stacktrace: ", e); 
     session.closeConnection(); 
     break; 
     } 
     handleSelectorkeys(selector.selectedKeys()); 
    } 
    executorService.shutdown(); 
    log.debug("worker stopped"); 
    } 

    private void handleSelectorkeys(Set<SelectionKey> selectedKeys) { 
    for (SelectionKey selKey : selector.selectedKeys()) { 
     selector.selectedKeys().remove(selKey); 
     try { 
     processSelectionKey(selKey); 
     } catch (IOException e) { 
     // Handle error with channel and unregister 
     selKey.cancel(); 
     log.error("Selector error: {}", e.toString()); 
     log.debug("Stacktrace: ", e); 
     } 
    } 
    } 

    public void processSelectionKey(SelectionKey selKey) throws IOException { 

    // Since the ready operations are cumulative, 
    // need to check readiness for each operation 
    if (selKey.isValid() && selKey.isConnectable()) { 
     log.debug("connectable"); 
     // Get channel with connection request 
     SocketChannel sChannel = (SocketChannel) selKey.channel(); 

     boolean success = sChannel.finishConnect(); 
     if (!success) { 
     // An error occurred; handle it 
     log.error("Error on finish"); 
     // Unregister the channel with this selector 
     selKey.cancel(); 
     } 
    } 

    if (selKey.isValid() && selKey.isReadable()) { 
     log.debug("readable"); 
     readMessage(selKey); 
    } 

    if (selKey.isValid() && selKey.isWritable()) { 
     log.debug("writable"); 
     writeMessage(selKey); 
    } 

    if (selKey.isValid() && selKey.isAcceptable()) { 
     log.debug("Acceptable"); 
    } 

    } 

    private void writeMessage(SelectionKey selKey) throws IOException { 
    byte[] message = messages.poll(); 
    if (message == null) { 
     return; 
    } 

    // Get channel that's ready for more bytes 
    SocketChannel socketChannel = (SocketChannel) selKey.channel(); 

    // See Writing to a SocketChannel 
    // Create a direct buffer to get bytes from socket. 
    // Direct buffers should be long-lived and be reused as much as 
    // possible. 

    ByteBuffer buf = ByteBuffer.allocateDirect(1024);// .allocateDirect(toSend.getBytes().length); 

    // try { 
    // Fill the buffer with the bytes to write; 
    // see Putting Bytes into a ByteBuffer 
    // buf.put((byte)0xFF); 

    buf.clear(); 
    buf.put(new byte[] { 0x02 }); 
    buf.put(message); 
    buf.put(new byte[] { 0x03 }); 
    // Prepare the buffer for reading by the socket 
    buf.flip(); 

    // Write bytes 
    int numBytesWritten = socketChannel.write(buf); 
    log.debug("Written: {}", numBytesWritten); 

    while (buf.hasRemaining()) { 
     numBytesWritten = socketChannel.write(buf); 
     log.debug("Written remining: {}", numBytesWritten); 
    } 
    } 

    private void readMessage(SelectionKey selKey) throws IOException { 
    // Get channel with bytes to read 
    SocketChannel socketChannel = (SocketChannel) selKey.channel(); 

    // See Reading from a SocketChannel 
    // Create a direct buffer to get bytes from socket. 
    // Direct buffers should be long-lived and be reused as much as 
    // possible. 
    ByteBuffer buf = ByteBuffer.allocateDirect(2048); 
    Charset charset = Charset.forName("UTF-8");// Charset.forName("ISO-8859-1"); 
    CharsetDecoder decoder = charset.newDecoder(); 

    // try { 
    // Clear the buffer and read bytes from socket 
    buf.clear(); 
    int numBytesRead = socketChannel.read(buf); 

    if (numBytesRead == -1) { 
     // No more bytes can be read from the channel 
     // socketChannel.close(); 
     return; 
    } 
    log.debug("Read bytes: {}", numBytesRead); 
    // To read the bytes, flip the buffer 
    buf.flip(); 

    String result = decoder.decode(buf).toString(); 

    log.debug("Read string: {}", result); 
    //processMessage(result.getBytes()); 
    } 

} 

답변

1
  1. TCP는 메시지 경계가 없습니다. 바이트 스트림 프로토콜입니다. 모든 메시지 경계는 사용자가 결정합니다.
  2. 선택 키가 올바르게 처리되지 않습니다. 반복을 통해 반복자를 통해 제거해야하며 집합을 통해 제거해야합니다. 이는 향상된 for-loop를 사용할 수 없음을 의미합니다. 키를 건너 뛰는 것 같습니다.

  3. read()에서 -1이되면 채널을 닫아야합니다.

  4. IOException이 표시되면 키를 취소하는 것만으로는 충분하지 않습니다. NB가 자동으로 키를 취소하는 채널을 닫아야합니다.