网络编程相关知识杂记之二(TCP&UDP编程相关)

Posted by W-M on December 31, 2017

本文主要记录了个人在学习网络编程相关知识时遇到的问题及自己的思考,主要用于备忘,错误难免,敬请指出!


UDP编程相关

UDP Server端与Client端Demo

普通UDP Client与UDP Server:

public class DatagramServer {

    private static final int PORT = 8099;

    public static void main(String[] args) throws InterruptedException {
        byte[] buffer = new byte[1024];
        try(DatagramSocket socket = new DatagramSocket(PORT)) {
            DatagramPacket request = new DatagramPacket(buffer, 1024);
            while (true) {
                try {
                    socket.receive(request);
                    System.out.println("data length: " + request.getData().length + " length: " + request.getLength() + " " + new String(request.getData(), 0, request.getLength(), "UTF-8"));
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        } catch (SocketException e) {
            e.printStackTrace();
        }
    }
}

public class DatagramClient {

    private static InetAddress HOST;
    private static final int PORT = 8099;

    static {
        try {
            HOST = InetAddress.getByName("localhost");
        } catch (UnknownHostException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws SocketException {
        try(DatagramSocket socket = new DatagramSocket(null)) {
            socket.setReuseAddress(true);
            socket.bind(new InetSocketAddress(8888));
            socket.connect(HOST, PORT);
            StringBuilder sb = new StringBuilder();
            for (int i = 0; i < 30; i++) {
                sb.append(i);
                System.out.println(sb);
                byte[] buffer = sb.toString().getBytes("UTF-8");
                DatagramPacket request = new DatagramPacket(buffer, buffer.length, HOST, PORT);
                socket.send(request);
            }
            System.out.println("数据发送完成");
        } catch (SocketException e) {
            e.printStackTrace();
        } catch (UnknownHostException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

采用生产者-消费者模型的UDP Server:

//LogUDPThread接收到来的DatagramPacket,并将其加入阻塞队列中等待消费者线程消费。  
public class LogUDPThread extends Thread {

    protected DatagramSocket socket = null;
    protected boolean listen = true;

    public LogUDPThread() throws IOException {
        this("UDPThread");
    }

    public LogUDPThread(String name) throws IOException {
        super(name);
        socket = new DatagramSocket(4445);
    }

    public void run() {
        byte[] buf = new byte[10000];
        DatagramPacket packet = new DatagramPacket(buf, buf.length);
        
        while (listen) {
            try {
                // receive request,
				//每次接收数据可以使用同一个DatagramPacket类对象,每次receive之后都会更新其中的data,remote IP,remote Port
				//等数据
                socket.receive(packet);
                
                // figure out response
				//将DatagramPacket对象中数据传出时不要直接使用packet.getData()中的数据,而要将其复制一份再传出
                String packetData = new String(packet.getData(), "UTF8");
                RawLog log = new RawLog();
                log.dataString = packetData;
                log.timestamp = new Date();
                log.sourceAddress = packet.getAddress().getHostAddress();
                log.sourceName = packet.getAddress().getCanonicalHostName();
                log.sourceType = "UDP";
                
				//将其加入阻塞队列中等待消费者消费
                Main.messagesToLog.add(log);

            } catch (Exception e) {
                e.printStackTrace();
                listen = false;
            }
        }
        socket.close();
    }
}

采用生产者-消费者模型实现的UDPServer的意义在于相比于使用操作系统提供给我们的UDP数据接收缓冲区,我们可以使用阻塞队列(比如LinkedBlockingQueue等)自定义更大的缓冲区,并可以更快的处理收到的UDP数据包。

UDP数据报抽象类DatagramPacket

发送数据时,我们使用DatagramPacket类封装我们要发送的数据,接收数据时,也需要使用DatagramPacket类将接收到的UDP数据报解析后存入DatagramPacket类对象中供应用程序使用。DatagramPacket类实现其实很简单,只是把我们发送数据与接收数据时需要使用的几个东西封装在了一起,其源码如下:

public final class DatagramPacket {
	byte[] buf;//存储待发送的数据  
        int offset;      
        int length;
        int bufLength;
        InetAddress address;//发送数据时存储的是目的地的地址;接收数据时存储的是远端数据发送方的地址,由receive方法填充
        int port;//发送数据时存储的是目的地的端口;接收数据时存储的是远端数据发送方的端口,由receive方法填充

	//使用DatagramPacket接收数据时使用下面两个构造函数
	public DatagramPacket(byte buf[], int offset, int length) {...}
	public DatagramPacket(byte buf[], int length) {...}

	//使用DatagramPacket发送数据时使用下面四个构造函数
	public DatagramPacket(byte buf[], int offset, int length, InetAddress address, int port) {...}
	public DatagramPacket(byte buf[], int offset, int length, SocketAddress address) {...}
	public DatagramPacket(byte buf[], int length, InetAddress address, int port) {...}
	public DatagramPacket(byte buf[], int length, SocketAddress address) {...}
	
	...
}

在使用DatagramSocket.receive(DatagramPacket p)方法接收数据时,我们在创建DatagramPacket可以指定将数据存入到哪个buffer中,从buffer的哪里开始存储,从开始位置之后的buffer中的多少个字节可以用来接收数据。在receive时,会根据offset和bufLength变量将数据报存入到buffer的指定位置中。比如接受到了10字节大小的数据报,offset变量为15,bufLength变量为1000,则接收此数据后buffer中从第15个字节开始到第24个字节存储的是数据报中的数据;又比如接受到了1010字节大小的数据报,由于bufLength变量为1000,一次最多只能接收1000字节大小的数据报,超出10个字节的部分将会被截断丢弃,接收数据后buffer中从第15个字节到1014个字节存储的是接收的数据。receive方法会改变DatagramPacket中length成员变量的值为接收的数据报的长度,比如接收了10个字节,length变为10,29个字节就变为29;DatagramPacket中buflength成员变量的值只能通过创建DatagramPacket对象时或者setLength()方法指定。receive方法是根据DatagramPacket中offset和bufLength变量进行数据填充的,与length成员变量无关。offset变量初始设置之后就不会再改变,每次接收的数据都从offset变量开始进行填充,这使得我们可以复用DatagramPacket对象接收数据。receive方法还会将DatagramPacket对象中address和port置为数据报发送方的ip和端口。

在使用DatagramSocket.send(DatagramPacket p)方法发送数据时,发送的数据是由DatagramPacket对象中的offset变量与length变量共同决定的,是buf中从offset变量开始length长度字节的数据。

问题:DatagramPacket类对象被多线程复用时的线程安全问题?

Java UDP中使用的DatagramSocket究竟是什么?

DatagramSocket就是基于UDP协议的编程模型,是UDP的应用层编程接口API,通过它应用层就可以访问UDP提供的服务。windows版JDK1.8中DatagramSocket类中相关操作实际是通过DatagramSocketImpl类实现的,DatagramSocketImpl类包含的成员变量如下:

public abstract class DatagramSocketImpl implements SocketOption {
    //The local port number.
    protected int localPort;

    //The file descriptor object.
    protected FileDescriptor fd;
}

可见对于每个DatagramSocketImpl子类的对象,其中均包含这几个变量,基于DatagramSocket进行的send,receive等操作实际上都是在这几个成员变量的基础上进行的。
DatagramSocketImpl UML图

图1:windows版JDK1.8中DatagramSocketImpl UML图

服务端与客户端都采用DatagramSocket对象进行数据传输,区别仅在于创建DatagramSocket对象时服务端绑定的是熟知端口,而客户端绑定的是随机端口。通过new DatagramSocket(int port)创建对象之后即开始在本地端口进行监听。下面是创建一个DatagramSocket对象涉及到的JDK源码简单分析:

public class DatagramSocket implements java.io.Closeable {
    public DatagramSocket(int port) throws SocketException {
        this(port, null);
    }

    public DatagramSocket(int port, InetAddress laddr) throws SocketException {
        this(new InetSocketAddress(laddr, port));
    }

    //最终调用这个构造方法创建DatagramSocket对象
    public DatagramSocket(SocketAddress bindaddr) throws SocketException {
        // create a datagram socket.
        createImpl();
        if (bindaddr != null) {
            try {
                bind(bindaddr);
            } finally {
                if (!isBound())
                    close();
            }
        }
    }
	
    void createImpl() throws SocketException {
        if (impl == null) {
			//可以提供了自己的DatagramSocketImpl类
            if (factory != null) {
                impl = factory.createDatagramSocketImpl();
                checkOldImpl();
            } else {
                boolean isMulticast = (this instanceof MulticastSocket) ? true : false;
				//采用默认的DatagramSocketImpl类
                impl = DefaultDatagramSocketImplFactory.createDatagramSocketImpl(isMulticast);
                checkOldImpl();
            }
        }
        // creates a udp socket
		//调用的是父类AbstractPlainDatagramSocketImpl中create方法为DatagramSocketImpl分配对应的文件描述符等资源。
        impl.create();
        impl.setDatagramSocket(this);
        created = true;
    }
}

class DefaultDatagramSocketImplFactory {
    static DatagramSocketImpl createDatagramSocketImpl(boolean isMulticast)
        throws SocketException {
        if (prefixImplClass != null) {
            try {
                return (DatagramSocketImpl) prefixImplClass.newInstance();
            } catch (Exception e) {
                throw new SocketException("can't instantiate DatagramSocketImpl");
            }
        } else {
            if (useDualStackImpl && !isMulticast)
				//win10中实际采用的是下面这个类创建DatagramSocket实现
                return new DualStackPlainDatagramSocketImpl(exclusiveBind);
            else
                return new TwoStacksPlainDatagramSocketImpl(exclusiveBind && !isMulticast);
        }
    }
}

abstract class AbstractPlainDatagramSocketImpl extends DatagramSocketImpl {
    protected synchronized void create() throws SocketException {
        ResourceManager.beforeUdpCreate();
        fd = new FileDescriptor();
        try {
			//调用的是DualStackPlainDatagramSocketImpl类的datagramSocketCreate方法
            datagramSocketCreate();
        } catch (SocketException ioe) {
            ResourceManager.afterUdpClose();
            fd = null;
            throw ioe;
        }
    }
}
  
class DualStackPlainDatagramSocketImpl extends AbstractPlainDatagramSocketImpl {
    protected void datagramSocketCreate() throws SocketException {
        if (fd == null)
            throw new SocketException("Socket closed");

        int newfd = socketCreate(false /* v6Only */);

        fdAccess.set(fd, newfd);
    }

    private static native int socketCreate(boolean v6Only);
}

DatagramSocket类的connect()操作

UDP Connect操作的本质与TCP Connect操作不同,UDP Connect的本质是向UDP控制块注册远端服务器的IP地址和端口号。UDP的Connect函数并不涉及任何UDP层的任何数据交互操作,UDP数据传输过程中仍然是无连接、不可靠的。向UDP控制块注册远端服务器的IP地址和端口号有什么用处呢?

基于UDP的无连接,UDP可以给多个IP发送数据包,就是UDP的一对多,这其中有UDP广播和多播通信。试想我们现在有个需求是调用datagramSocket.receive()等待接收指定服务器发过来的响应数据,但是此时有个其它服务器的数据包发过来了,打断了我们receive操作的阻塞,这就与我们预想的不符;通过调用datagramSocket.connect()方法指定了连接的服务器IP和端口之后,在调用connect方法的一端只会接收来自指定服务器IP和端口的数据,来自其它来源的数据都会被静静的丢弃,程序不会有任何察觉。在调用connect方法的一端也只会向指定服务器发送数据,若向其它服务器发送数据,会抛出IllegalArgumentException。特殊的是如果指定的这个SocketAddress 是个多播地址或者广播地址,那么只允许向这个地址发送数据包,不允许从这个地址接收数据包。

UDP通过这样的方式,限定了单向的通信,但是注意的是,这里的限定只是仅限于一方,而并没有限制另外一方,另外一方依旧可以向多个IP地址发送数据包,接收多个地址的数据包。

DatagramSocket类的send()操作

在UDP连接一方发送数据的过程中,调用send()方法将报文发送到出队列。UDP逐个将报文取出,将报文加上UDP头部后递交给IP层。出队列可能发生溢出,如果发生了溢出,操作系统就要求客户进程在继续发送报文之前要等待。send()操作源码简单分析如下:

public class DatagramSocket implements java.io.Closeable {
    public void send(DatagramPacket p) throws IOException  {
        InetAddress packetAddress = null;
        synchronized (p) {
            ...
			//判断DatagramSocket是否调用过connect方法
            if (connectState == ST_NOT_CONNECTED) {
                ...                
            } else {
                // we're connected
                packetAddress = p.getAddress();
                if (packetAddress == null) {
                    p.setAddress(connectedAddress);
                    p.setPort(connectedPort);
                } else if ((!packetAddress.equals(connectedAddress)) ||
                           p.getPort() != connectedPort) {
					//若当前DatagramPacket中指定的端口号与之前connect的端口号不同,会抛出异常
                    throw new IllegalArgumentException("connected address and packet address differ");
                }
            }
            // Check whether the socket is bound
            if (!isBound())
                bind(new InetSocketAddress(0));
            // call the  method to send
			//实际调用DualStackPlainDatagramSocketImpl的send方法
            getImpl().send(p);
        }
    }
}

class DualStackPlainDatagramSocketImpl extends AbstractPlainDatagramSocketImpl {
    protected void send(DatagramPacket p) throws IOException {
        int nativefd = checkAndReturnNativeFD();

        if (p == null)
            throw new NullPointerException("null packet");

        if (p.getAddress() == null ||p.getData() ==null)
            throw new NullPointerException("null address || null buffer");
		//实际发送数据的方法
        socketSend(nativefd, p.getData(), p.getOffset(), p.getLength(),
                   p.getAddress(), p.getPort(), connected);
    }

    private static native void socketSend(int fd, byte[] data, int offset, int length, InetAddress address, int 	port, boolean connected) throws IOException;
}

问题:多线程使用同一个DatagramSocket对象,每个线程持有不同的DatagramPacket类对象,调用send方法给不同的服务器发数据,是否需要考虑线程安全问题?

我觉得应该不会有由于线程之间竞争导致一个包发送一半之类的发包失败的情况,但是发包顺序如果不进行同步操作的话则无法保证。

有时可能使用两个线程同时持有一个DatagramSocket对象,一个线程进行发送,另一个线程进行接收操作;但两个线程使用同一个DatagramSocket对象均进行发送操作或者接收操作,则不常见,使用时需要注意线程间同步问题。

DatagramSocket类的receive()操作

接收方接收数据时会将接收到的用户数据报放在该队列的末尾,如果接收队列为空,receive方法会阻塞直到有新数据到来,当然也可以通过setSoTimeout(int timeout)设置receive的超时时间,时间到后会抛出异常。如果新数据到来时接收队列已满,UDP就丢弃这个新到来的用户数据报,并请求向客户发送端口不可达报文。receive()操作源码简单分析如下:

public class DatagramSocket implements java.io.Closeable {
    public synchronized void receive(DatagramPacket p) throws IOException {
        synchronized (p) {
            ...
            DatagramPacket tmp = null;
            if ((connectState == ST_CONNECTED_NO_IMPL) || explicitFilter) {
                // We have to do the filtering the old fashioned way since
                // the native impl doesn't support connect or the connect
                // via the impl failed, or .. "explicitFilter" may be set when
                // a socket is connected via the impl, for a period of time
                // when packets from other sources might be queued on socket.
                boolean stop = false;
                while (!stop) {
                    InetAddress peekAddress = null;
                    int peekPort = -1;
                    // peek at the packet to see who it is from.
					//在将UDP数据报从接收队列中取出之前先利用peekData方法查看此数据报的IP和端口是否与Connect操作中指定的一致
                    if (!oldImpl) {
                        // We can use the new peekData() API
                        DatagramPacket peekPacket = new DatagramPacket(new byte[1], 1);
                        peekPort = getImpl().peekData(peekPacket);
                        peekAddress = peekPacket.getAddress();
                    } else {
                        // this api only works for IPv4
                        peekAddress = new InetAddress();
                        peekPort = getImpl().peek(peekAddress);
                    }
                    if ((!connectedAddress.equals(peekAddress)) ||
                        (connectedPort != peekPort)) {
                        // throw the packet away and silently continue
				//如果当前数据报的IP和端口是否与Connect操作中指定的不一致,则静静的丢弃此数据报,直接进行下一次receive操作
                        tmp = new DatagramPacket(
                                                new byte[1024], 1024);
                        getImpl().receive(tmp);
                        if (explicitFilter) {
                            if (checkFiltering(tmp)) {
                                stop = true;
                            }
                        }
                    } else {
                        stop = true;
                    }
                }
            }
            // If the security check succeeds, or the datagram is
            // connected then receive the packet
			//调用AbstractPlainDatagramSocketImpl的receive方法
            getImpl().receive(p);
            if (explicitFilter && tmp == null) {
                // packet was not filtered, account for it here
                checkFiltering(p);
            }
        }
    }
}

abstract class AbstractPlainDatagramSocketImpl extends DatagramSocketImpl {
    protected synchronized void receive(DatagramPacket p)
        throws IOException {
		//调用DualStackPlainDatagramSocketImpl的receive0方法
        receive0(p);
    }
}

class DualStackPlainDatagramSocketImpl extends AbstractPlainDatagramSocketImpl {
    protected synchronized void receive0(DatagramPacket p) throws IOException {
        int nativefd = checkAndReturnNativeFD();

        if (p == null)
            throw new NullPointerException("packet");
        if (p.getData() == null)
            throw new NullPointerException("packet buffer");

        socketReceiveOrPeekData(nativefd, p, timeout, connected, false /*receive*/);
    }

    private static native int socketReceiveOrPeekData(int fd, DatagramPacket packet,
        int timeout, boolean connected, boolean peek) throws IOException;
}

DatagramSocket类的close()操作

使用完DatagramSocket类要释放资源,DatagramSocket类关闭方法较简单,只有调用close方法一种方式,可以调用isClosed()判断当前DatagramSocket是否被关闭。

DatagramSocket类的相关选项设置

  • setSoTimeout(int timeout):设置receive操作超时时间,0代表永不超时,当阻塞时间超过指定时间时,会抛出SocketTimeoutException,如果捕获这个异常的话,仍可以继续使用DatagramSocket receive操作接收数据。
  • setSendBufferSize(int size):设置发送缓冲区大小,即数据发送队列大小,默认为64K。仅仅只是给操作系统一个建议,实际使用的SendBufferSize还需要通过getSendBufferSize()确定。若发送缓冲区大小设置为128K,发出的每个数据包大小为8K,在缓冲区中最多可以存储8个数据包。当操作系统从数据发送队列中取出数据发送的速度小于上层应用向发送队列中发送数据的速度时,就有可能造成数据堆积在发送队列中。
  • setReceiveBufferSize(int size):设置接收缓冲区大小,即数据接收队列大小,默认为64K。仅仅只是给操作系统一个建议,实际使用的ReceiveBufferSize还需要通过getReceiveBufferSize()确定。若接收缓冲区大小设置为128K,接收的每个数据包大小为8K,在缓冲区中最多可以存储8个数据包。如果数据到来的速度太快,接收应用来不及处理,UDP就丢弃这个用户数据报,并请求向客户发送端口不可达报文。
  • setReuseAddress(boolean on):设置端口复用,并不是所有类型的操作系统都支持这个参数,如果不支持此方法调用将会被os忽略,默认为false。当端口复用开启时,可以使用多个DatagramSocket对象监听同一个端口。此参数必须在DatagramSocket bind到某个端口之前被使用。
    try(DatagramSocket socket = new DatagramSocket(null)) {
              socket.setReuseAddress(true);
              socket.bind(new InetSocketAddress(host, port));
    }
    

UDP数据包的发送和接收问题

  • UDP的通信有界性: 在阻塞模式下,UDP的通信是以数据包作为界限的,即使server端的缓冲区再大也要按照client发包的次数来多次接收数据包,server只能一次一次的接收,client发送多少次,server就需接收多少次,即客户端分几次发送过来,服务端就必须按几次接收。
  • UDP数据包的无序性和非可靠性: client依次发送1、2、3三个UDP数据包,server端先后调用3次接收函数,可能会依次收到3、2、1次序的数据包,收包可能是1、2、3的任意排列组合,也可能丢失一个或多个数据包。
  • UDP数据包的接收: client发送两次UDP数据,第一次 500字节,第二次300字节,server端阻塞模式下接包,第一次recvfrom(1000),收到是 1000,还是500,还是300,还是其他? 由于UDP通信的有界性,接收到只能是500或300,又由于UDP的无序性和非可靠性,接收到可能是300,也可能是500,也可能一直阻塞在recvfrom调用上,直到超时返回(也就是什么也收不到)。 在假定数据包是不丢失并且是按照发送顺序按序到达的情况下,server端阻塞模式下接包,先后三次调用:recvfrom(200),recvfrom(1000),recvfrom(1000),接收情况如何呢? 由于UDP通信的有界性,第一次recvfrom(200)将接收第一个500字节的数据包,但是因为用户空间buf只有200字节,于是只会返回前面200字节,剩下300字节将丢弃。第二次recvfrom(1000)将返回300字节,第三次recvfrom(1000)将会阻塞。
  • UDP包分片问题: 如果MTU是1500,Client发送一个8000字节大小的UDP包,那么Server端阻塞模式下接包,在不丢包的情况下,recvfrom(9000)是收到1500,还是8000。如果某个IP分片丢失了,recvfrom(9000),又返回什么呢? 根据UDP通信的有界性,在buf足够大的情况下,接收到的一定是一个完整的数据包,UDP数据在下层的分片和组片问题由IP层来处理,提交到UDP传输层一定是一个完整的UDP包,那么recvfrom(9000)将返回8000。如果某个IP分片丢失,udp里有个CRC检验,如果包不完整就会丢弃,也不会通知是否接收成功,所以UDP是不可靠的传输协议,那么recvfrom(9000)将阻塞。

更多关于UDP数据包发送接收问题,可参考:告知你不为人知的UDP-疑难杂症和使用


TCP编程相关

Socket Server端与Client端Demo

Server端采用线程池限制过多的资源使用,连接数较大时,server可能拒绝连接,但起码不会崩溃。

public class PooledSocketServer {
    private static final int PORT = 8000;
    private static ExecutorService pool = Executors.newFixedThreadPool(50);

    public static void main(String[] args) {
        try(ServerSocket serverSocket = new ServerSocket(PORT)) {
            while (true) {
                Socket connection = serverSocket.accept();
                pool.submit(new TestTask(connection));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private static class TestTask implements Callable<Void> {
        private Socket connection;

        TestTask(Socket connection) {
            this.connection = connection;
        }

        @Override
        public Void call() {
            try {
                PrintWriter writer = new PrintWriter(new OutputStreamWriter(connection.getOutputStream()));
                String send = "ping!";
                System.out.println("server send: " + send);
                writer.println(send);
                writer.flush();
                BufferedReader reader = new BufferedReader(new InputStreamReader(connection.getInputStream()));
                String receive = reader.readLine();
                System.out.println("server receive: " + receive);
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            return null;
        }
    }
}

public class SocketClient {
    private static final String SERVERIP = "localhost";
    private static final int PORT = 8000;
    private static final int TIMEOUT = 60000;//超时时间,以ms为单位

    public static void main(String[] args) {
        try(Socket socket = new Socket()) {
            SocketAddress socketAddress = new InetSocketAddress(SERVERIP, PORT);
            socket.connect(socketAddress, TIMEOUT);
            BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            PrintWriter writer = new PrintWriter(new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())));
            String receive = reader.readLine();
            System.out.println("client receive: " + receive);
            if (receive != null && receive.equals("ping!")) {
                String send = "pang!";
                writer.print(send);
                writer.flush();
                System.out.println("client send: " + receive);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

Socket究竟是什么?

Socket就是基于TCP/IP协议的编程模型,是TCP的应用层编程接口API,通过它应用层就可以访问TCP提供的服务。windows版JDK1.8中Socket类中相关操作实际是通过SocketImpl类实现的,SocketImpl类包含的成员变量如下:

public abstract class SocketImpl implements SocketOptions {
    //The file descriptor object for this socket.
    protected FileDescriptor fd;

    //The IP address of the remote end of this socket.
    protected InetAddress address;

    //The port number on the remote host to which this socket is connected.
    protected int port;

    //The local port number to which this socket is connected.
    protected int localport;
}

可见对于每个SocketImpl子类的对象,其中均包含这几个变量,基于socket进行的connect、read、write等操作实际上都是在这几个成员变量的基础上进行的。
SocketImpl UML图

图2:windows版JDK1.8中SocketImpl UML图

Socket Connect操作

Socket Connect连接建立的过程实际就是与服务器端进行三次握手的过程,connect()方法是阻塞的,可能阻塞在等待服务器响应syn发回syn+ack数据包的过程中。 常见的客户端与服务器建立连接方式有两种:

Socket s = new Socket(String host, String port);

或者

Socket s = new Socket();
SocketAddress socketAddress = new InetSocketAddress(serverIp, port);
s.connect(socketAddress, timeout);//超时时间以ms为单位

上面的方式默认永不超时,直到客户端发现连接无法建立抛出异常;下面的方式在创建连接时可以指定超时时间,当超过指定时间连接还未建立,就会抛出异常停止尝试建立连接,当然,也有可能还未达到我们设置的timeout时间即抛出异常超时。什么情况下有可能抛出连接建立超时异常呢?

  • 由于网络的问题,TCP/IP三次握手时间>timeout的设置时间。这在国外访问weibo时,并且网络环境极差的情况下有可能发生。解决的办法:调大socket.connect方法中的timeout参数值,比如50s,linux默认最高是70s,如果超过70s没有意义,linux会采用70s。但是当调大之后,发现还是不到10s就报timeout exception。并且通过国外的机器ping api.weibo.com发现unreachable。说明客户端在传输层之下的网络层就发现连个Syn的报文都发不出去,更不用说三次握手了,客户端不用等到我们设置的timeout超时,直接失败并抛timeout exception。经验:在connection timeout诊断的第一步应该是ping一下确认网络层没有问题。注:客户端设置了timeout,但并不会等到超时才返回异常。客户端只要第一时间发现连接失败,就会抛timeout exception。
  • 如果timeout设置的时间足够,但是由于服务器端的处理能力较差,比如缓冲连接队列较小,而应用层的处理能力没有连接缓冲快,导致缓冲连接占满,而拒绝新的连接。在服务端因为连接队列占满而拒绝服务的期间,客户端通过TCP协议重试连接三次。每次的时间翻倍。如果三次时间的累加<timeout参数值且能连接上,属于正常情况,表示队列腾出空位放当前连接。如果三次时间的累加<timeout参数值且未能连接上,则客户端会立刻抛出timeout exception,而不是等到timeout到期才抛。需要注意的是服务端通过ServerSocket.accept()从缓冲连接队列中取出连接,在服务器将连接从缓冲队列取出之前,三次握手后的连接都在缓冲队列中保存。只要客户端与服务端三次握手建立完成,客户端就可以向服务器发送数据,即使服务器没有将此客户端连接从缓冲连接队列中取出,客户端发送的数据也不会丢失。

下面从以第一种方式new Socket(String host, String port)基于Socket类相关源码来分析连接建立过程,第二种方式原理与之大致相同。

public Socket(String host, int port) throws UnknownHostException, IOException
{
	//若输入的host是域名,则根据DNS进行域名查找的过程是阻塞的
    this(host != null ? new InetSocketAddress(host, port) :
         new InetSocketAddress(InetAddress.getByName(null), port),
         (SocketAddress) null, true);
}
private Socket(SocketAddress address, SocketAddress localAddr,
                   boolean stream) throws IOException {
	//设置socket的实际实现,为什么不直接在socket方法中实现,而是又添加一层socketImpl外包装呢?
	//因为socket可以有许多种不同实现,使用socketImpl通过多态可灵活切换不同实现,相比于将不同实现方式都写在socket类中更清晰易懂
    setImpl();

    // backward compatibility
    if (address == null)
        throw new NullPointerException();

    try {
		//在本地调用操作系统TCP/IP相关API为socket创建相应文件描述符
        createImpl(stream);
		//为socket绑定相应的本地端口
        if (localAddr != null)
            bind(localAddr);
		//如果创建socket时候并没有指定本地端口,则会在connect方法中自动指定使用的端口
		//connect方法会尝试与远程服务器进行三次握手建立连接,可能阻塞
        connect(address);
    } catch (IOException | IllegalArgumentException | SecurityException e) {
        try {
            close();
        } catch (IOException ce) {
            e.addSuppressed(ce);
        }
        throw e;
    }
}
void setImpl() {
    if (factory != null) {
		//如果用户提供了SocketImplFactory,则使用用户提供的方法创建socketImpl。
        impl = factory.createSocketImpl();
        checkOldImpl();
    } else {
        // No need to do a checkOldImpl() here, we know it's an up to date
        // SocketImpl!
        impl = new SocksSocketImpl();
    }
    if (impl != null)
        impl.setSocket(this);
}
//问:java.net.socket的createImpl方法还能用于创建UDP实现呢?
//@param stream a {@code boolean} value : {@code true} for a TCP socket, {@code false} for UDP.
void createImpl(boolean stream) throws SocketException {
    if (impl == null)
        setImpl();
    try {
        impl.create(stream);
        created = true;
    } catch (IOException e) {
        throw new SocketException(e.getMessage());
    }
}
/**AbstractPlainSocketImpl类doConnect方法是实现socket连接的主要操作,JDK官方文档中对其描述如下:
 * The workhorse of the connection operation.  Tries several times to
 * establish a connection to the given <host, port>.  If unsuccessful,
 * throws an IOException indicating what went wrong.
 */
synchronized void doConnect(InetAddress address, int port, int timeout) throws IOException {
    synchronized (fdLock) {
        if (!closePending && (socket == null || !socket.isBound())) {
            NetHooks.beforeTcpConnect(fd, address, port);
        }
    }
    try {
        acquireFD();
        try {
		//根据操作系统不同,调用不同socketConnect的实现,比如win10调用的是DualStackPlainSocketImpl类的socketConnect方法
            socketConnect(address, port, timeout);
            /* socket may have been closed during poll/select */
            synchronized (fdLock) {
                if (closePending) {
                    throw new SocketException ("Socket closed");
                }
            }
            // If we have a ref. to the Socket, then sets the flags
            // created, bound & connected to true.
            // This is normally done in Socket.connect() but some
            // subclasses of Socket may call impl.connect() directly!
            if (socket != null) {
                socket.setBound();
                socket.setConnected();
            }
        } finally {
            releaseFD();
        }
    } catch (IOException e) {
        close();
        throw e;
    }
}
//DualStackPlainSocketImpl类的socketConnect方法
void socketConnect(InetAddress address, int port, int timeout) throws IOException {
    int nativefd = checkAndReturnNativeFD();

    if (address == null)
        throw new NullPointerException("inet address argument is null.");

    int connectResult;
    if (timeout <= 0) {
		//如果未设置超时时间(即超时时间为0)或超时时间小于0,阻塞直到连接建立完成或抛出异常
        connectResult = connect0(nativefd, address, port);
    } else {
		//阻塞直到连接建立完成或抛出异常,阻塞时间最大值为设置的timeout,超过则抛出异常
        configureBlocking(nativefd, false);
        try {
            connectResult = connect0(nativefd, address, port);
            if (connectResult == WOULDBLOCK) {
                waitForConnect(nativefd, timeout);
            }
        } finally {
            configureBlocking(nativefd, true);
        }
    }
    /*连接建立之后根据nativefd设置本地端口
     * We need to set the local port field. If bind was called
     * previous to the connect (by the client) then localport field
     * will already be set.
     */
    if (localport == 0)
        localport = localPort0(nativefd);
}

Socket Read操作

Socket read操作与文件读取操作本质上是相同的,只不过数据来源不同,Socket read数据来源是连接对方通过网络发来的数据,文件读取操作数据来源是磁盘上的数据,两种read操作都是阻塞的,并且阻塞都分为两个阶段,一是等待数据在内核空间中被准备好,二是将数据从内核空间拷贝到用户空间。Socket read操作直接操作的数据结构是内核空间中的TCP数据接收队列,比如我们当前想一次性读取50个字节,若TCP接收数据队列中有可读的数据,则read操作返回值为当前缓冲区中可一次性读取的字节数(最大为50,可能小于50),若没有可读的数据,则read操作会阻塞直到有数据准备好。

若客户端设置了SoTimeout选项,比如设置为阻塞5s即超时,超时后对于客户端会抛出异常 java.net.SocketTimeoutException: Read timed out,SocketTimeoutException属于checkedException,若客户端catch此Exception,客户端与服务器建立的socket连接还可以进行正常读写,否则客户端程序停止执行,socket连接被关闭,服务器端基于此socket进行的读操作当内核接收缓冲区数据为空时会直接返回-1。问题:socket连接关闭后,服务器端基于此socket进行的写操作会产生什么问题呢?

对于TCP接收数据缓冲区,我们可以通过setReceiveBufferSize(int size) 来设置其大小,在默认情况下,输入流的接收缓冲区是65536个字节(64K)。这个值是Java所建议的输入缓冲区的大小。但最好不要将输入缓冲区设得太小,否则会导致传输数据过于频繁,从而降低网络传输的效率。

对于文件读写操作,读到文件尾时read会返回-1,那么对于基于socket的read操作,什么时候会返回-1呢?答案是当一方关闭了输出时(比如调用shutdownOutput方法或者出现异常关闭整个socket),另一方在接收缓冲区数据为空的条件下进行读取就会返回-1。具体可参见:TCP Socket通信中由read返回值造成的的死鎖問題

通过java.net.socket进行read操作源码分析如下:

public class Socket implements java.io.Closeable {
    public InputStream getInputStream() throws IOException {
        if (isClosed())
            throw new SocketException("Socket is closed");
        if (!isConnected())
            throw new SocketException("Socket is not connected");
        if (isInputShutdown())
            throw new SocketException("Socket input is shutdown");
        final Socket s = this;
        InputStream is = null;
        try {
            is = AccessController.doPrivileged(
                new PrivilegedExceptionAction<InputStream>() {
                    public InputStream run() throws IOException {
						//实际调用的是AbstractPlainSocketImpl类的getInputStream()方法
                        return impl.getInputStream();
                    }
                });
        } catch (java.security.PrivilegedActionException e) {
            throw (IOException) e.getException();
        }
        return is;
    }
}
abstract class AbstractPlainSocketImpl extends SocketImpl {
    protected synchronized InputStream getInputStream() throws IOException {
        synchronized (fdLock) {
            if (isClosedOrPending())
                throw new IOException("Socket Closed");
            if (shut_rd)
                throw new IOException("Socket input is shutdown");
            if (socketInputStream == null)
				//真正的InputStream实现类是SocketInputStream
                socketInputStream = new SocketInputStream(this);
        }
        return socketInputStream;
    }
}
class SocketInputStream extends FileInputStream {
    public int read() throws IOException {
        if (eof) {
            return -1;
        }
        temp = new byte[1];
        int n = read(temp, 0, 1);
        if (n <= 0) {
            return -1;
        }
        return temp[0] & 0xff;
    }
    public int read(byte b[]) throws IOException {
        return read(b, 0, b.length);
    }
    public int read(byte b[], int off, int length) throws IOException {
        return read(b, off, length, impl.getTimeout());
    }
    //实际的read方法实现
    int read(byte b[], int off, int length, int timeout) throws IOException {
        int n;

        // EOF already encountered
        if (eof) { return -1; }

        // connection reset
        if (impl.isConnectionReset()) { throw new SocketException("Connection reset"); }

        // bounds check
        if (length <= 0 || off < 0 || off + length > b.length) {
            if (length == 0) {
                return 0;
            }
            throw new ArrayIndexOutOfBoundsException();
        }

        boolean gotReset = false;

        // acquire file descriptor and do the read
        FileDescriptor fd = impl.acquireFD();
        try {
			//调用socketRead方法进行读取,指定超时时间
            n = socketRead(fd, b, off, length, timeout);
            if (n > 0) {
                return n;
            }
        } catch (ConnectionResetException rstExc) {
            gotReset = true;
        } finally {
            impl.releaseFD();
        }

        /*
         * We receive a "connection reset" but there may be bytes still
         * buffered on the socket
         */
        if (gotReset) {
            impl.setConnectionResetPending();
            impl.acquireFD();
            try {
                n = socketRead(fd, b, off, length, timeout);
                if (n > 0) {
                    return n;
                }
            } catch (ConnectionResetException rstExc) {
            } finally {
                impl.releaseFD();
            }
        }

        /*
         * If we get here we are at EOF, the socket has been closed,
         * or the connection has been reset.
         */
        if (impl.isClosedOrPending()) {
            throw new SocketException("Socket closed");
        }
        if (impl.isConnectionResetPending()) {
            impl.setConnectionReset();
        }
        if (impl.isConnectionReset()) {
            throw new SocketException("Connection reset");
        }
        eof = true;
        return -1;
    }
    private int socketRead(FileDescriptor fd,
                           byte b[], int off, int len,
                           int timeout)
        throws IOException {
		//调用native方法socketRead0进行读取
        return socketRead0(fd, b, off, len, timeout);
    }
    private native int socketRead0
		(FileDescriptor fd, byte b[], int off, int len, int timeout) throws IOException;
}

调用通过socket获取的InputStream的read操作后关闭的实际是整个socket,也就是说名义上关闭的是读取操作,实际上关闭后对于此socket的写操作也无法进行了。

class SocketInputStream extends FileInputStream {
    public void close() throws IOException {
        // Prevent recursion. See BugId 4484411
        if (closing)
            return;
        closing = true;
        if (socket != null) {
            if (!socket.isClosed())
				//实际关闭的是整个socket
                socket.close();
        } else
            impl.close();
        closing = false;
    }
}

Socket Write操作

Write操作直接操作的是底层TCP输出数据缓冲区,每次Write操作都会将指定要写出的数据全部写出之后才返回,如果TCP输出数据缓冲区大小不足以容纳当前要写出的字节数,write操作将会阻塞直到缓冲区中有足够空间,并且write操作不能像read操作一样指定阻塞超时时间,如果缓冲区中一直没有足够空间,write操作会一直阻塞下去。一个Write操作阻塞的示例如下:

public class Client {
    private static final String SERVERIP = "127.0.0.1";
    private static final int PORT = 8098;

    public static void main(String[] args) throws IOException, InterruptedException {
        try(Socket socket = new Socket()) {
            SocketAddress socketAddress = new InetSocketAddress(SERVERIP, PORT);//连接远程8098端口
            socket.setSendBufferSize(8 * 1024);//设置客户端发送缓冲区大小为8k
            socket.connect(socketAddress, 50000);//设置连接超时时间为50000ms         
            byte[] buffer = new byte[1024];
            Arrays.fill(buffer, (byte) 1);
            OutputStream os = socket.getOutputStream();
            for (int i = 0; i < 1024; i++) {
                os.write(buffer);      
                System.out.println("第" + i + "次写出" + " buffer size: " + socket.getSendBufferSize());
            }
        } catch (UnknownHostException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
public class Server {
    private static final int PORT = 8098;

    public static void main(String[] args) {
        try(ServerSocket serverSocket = new ServerSocket(PORT)) {
            while (true) {           
                serverSocket.setReceiveBufferSize(8 * 1024);//设置接收数据缓冲区大小为8k
                Socket client = serverSocket.accept();
                new Thread(new Runnable() {
                    @Override
                    public void run() {                       
                        while (true) {
                            try {
                                Thread.sleep(1000);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }).start();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

执行结果为:
	0次写出 buffer size: 8192
	1次写出 buffer size: 8192
	...
	23次写出 buffer size: 8192

由执行结果可以看出由于服务端一直没有读取数据,客户端write操作在第24次写出之后就阻塞了,问题:客户端发送缓冲区大小为8K,服务端接收缓冲区大小也为8K,理论来讲客户端发送16K数据之后不是就应该阻塞?为何可以发送24K数据?TCP滑动窗口大小与接收缓冲区大小

什么情况下会造成TCP输出数据缓冲区不足以容纳当前要写入的数据呢?

我们知道TCP滑动窗口大小是由网络拥塞状况与客户端接收窗口大小共同决定的,假定网络环境通畅,一种极端情况就是接收端对于接收到的数据一直不进行read操作导致接收端输入数据缓冲区满,接收端滑动窗口大小变为0,发送端无法再通过网络将发送端发送缓冲区的数据发送出去,造成发送端发送缓冲区满,write操作阻塞。可以通过SendBufferSize方法设置TCP输出数据缓冲区大小。

由此也可理解为何对于socket来讲发送数据时的flush操作没有作用,这是因为数据能否发出不是由自己决定而是由对方或者网络状况决定的。通过分析SocketOutputStream源码可见其实际调用的是OutputStream类的flush方法,并没有进行任何操作。

socket通过SocketOutputStream类进行实际write操作,SocketOutputStream相关源码如下:

class SocketOutputStream extends FileOutputStream {
    public void write(int b) throws IOException {
        temp[0] = (byte)b;
        socketWrite(temp, 0, 1);
    }

    public void write(byte b[]) throws IOException {
        socketWrite(b, 0, b.length);
    }

    public void write(byte b[], int off, int len) throws IOException {
        socketWrite(b, off, len);
    }

    private void socketWrite(byte b[], int off, int len) throws IOException {

        if (len <= 0 || off < 0 || off + len > b.length) {
            if (len == 0) {
                return;
            }
            throw new ArrayIndexOutOfBoundsException();
        }

        FileDescriptor fd = impl.acquireFD();
        try {
            socketWrite0(fd, b, off, len);
        } catch (SocketException se) {
            if (se instanceof sun.net.ConnectionResetException) {
                impl.setConnectionResetPending();
                se = new SocketException("Connection reset");
            }
            if (impl.isClosedOrPending()) {
                throw new SocketException("Socket closed");
            } else {
                throw se;
            }
        } finally {
            impl.releaseFD();
        }
    }
	
    //实际调用native write操作
    private native void socketWrite0(FileDescriptor fd, byte[] b, int off,
                                     int len) throws IOException;
	
    //对于输出流的关闭操作实际上是关闭的整个socket
    public void close() throws IOException {
        // Prevent recursion. See BugId 4484411
        if (closing)
            return;
        closing = true;
        if (socket != null) {
            if (!socket.isClosed())
                socket.close();
        } else
            impl.close();
        closing = false;
    }
}

Socket Close操作

  • socket.shutdownInput():假设A与B建立了Socket连接,在A端调用此方法,A端接收缓冲区剩余的数据被读取之后再进行read操作会返回-1;对于B端能否感知到A端输入已经关闭?可以感知到,B端在A端关闭输入后向A端发送数据是在B端会抛出异常,对于A端在关闭输入之后会将接收到的数据静静的丢弃。
  • socket.shutdownOutput():假设A与B建立了Socket连接,在A端调用此方法,A端发送缓冲区剩余的数据会依次发送给B端,之后在A端再向B写数据会抛出IOException。对于B端,将输入缓冲区数据读完之后read操作会返回-1。
  • socket.getInputStream().close():关闭整个socket。
  • socket.getOutStream().close():关闭整个socket。
  • socket.close():关闭整个socket。

采用shutdownInput或shutdownOutput关闭连接的一半或者将连接的两半都关闭,使用结束后仍需关闭该socket,shutdown方法只影响socket的流,它们并不释放与Socket关联的资源,如占用的端口等。

Socket关闭时发送缓冲区与接收缓冲区中均有可能存在数据,socket关闭后,接收缓冲区中存在的数据不能再被读取(否则会抛出异常java.net.SocketException: socket closed),发送缓冲区中的数据也会被丢弃。可以通过设置setSoLinger(boolean on, int linger)方法阻塞close方法一段时间,在关闭之前尽可能多的将发送缓冲区中剩余数据送出。

Socket相关选项设置及其作用

  • setTcpNoDelay(boolean on):设置TCP_NODELAY选项,true则禁用Nagle算法,false则启用Nagel算法,默认为false。有时通过禁用Nagle算法可以解决小包数据的延迟发送问题,具体可参见:神秘的40毫秒延迟与TCP_NODELAY
  • setSoTimeout(int timeout):设置read操作超时时间,以ms为单位,若不设置则默认阻塞直到接收缓冲区中有数据准备好。
  • setReceiveBufferSize(int size):设置TCP输入流的接收缓冲区大小,默认是65536个字节(64K),仅仅是给操作系统一个建议,实际的接收缓冲区大小还需通过getReceiveBufferSize获得。对于接收缓冲区,当大小小于64k时是可以在任意时刻动态更改的,当大于64k时,需要分两种情况进行处理:一、对于服务端,必须在通过serverSocket绑定到本地地址之前通过ServerSocket.setReceiveBufferSize(int)设置要创建的socket的接收缓冲区大小;二、对于客户端必须在连接到服务器之前设置接收缓冲区大小。
  • setSendBufferSize(int size):设置TCP输出流的发送缓冲区大小,默认是65536个字节(64K),仅仅是给操作系统一个建议,实际的发送缓冲区大小还需通过setSendBufferSize获得。
  • setSoLinger(boolean on, int linger):若调用socket.close方法时发送缓冲区中还存在尚未发出的数据,则阻塞linger时间之后close方法才返回。当on为true时linger选项才有效,linger为0代表close方法立即返回。linger单位为秒,若close时发送缓冲区中并没有数据,则close方法可立即返回,不必等到linger时间。
  • sendUrgentData(int data):发送一字节的UrgentData,即int data中的低八位。此urgentdata将被另一方接收的顺序是在之前已经通过SocketOutputstream发出的数据之后,在之后要通过SocketOutputstream发出的数据之前。注意:这里所指的urgentdata与TCP协议中规定的urg标志置位的数据报接收要求有所不同,TCP协议中规定当接收端的TCP接收到URG位置1的段,它就利用紧急指针的值从段中提取出紧急数据,并不按序的将其传递给接收应用程序。但java中通过sendUrgentData发送的数据在接收方缓冲区中有数据时会按照顺序被接收方接收,并不会被优先接收。
  • setOOBInline(boolean on):与sendUrgentData方法对应,只有在接收方setOOBInline设置为true时,发送方发送的urgentdata数据才能被接收方接收,否则接收方会将接收到的urgentdata静静丢弃。问题:使用sendUrgentData()方法检测连接是否断开是否合理??
    try{
        socket.sendUrgentData(0xFF);
    }catch(Exception ex){
        reconnect();
    }
    
  • setKeepAlive(boolean on):设置TCP的keep-alive选项,true代表开启,默认关闭,keep-alive选项可用来检测连接存活状态。
  • setTrafficClass(int tc):设置IP数据包头部中的服务类型,0到255之间,仅仅只是给底层网络实现一个建议,不确定是否被采用。
  • setReuseAddress(boolean on):此选项使得一个端口即使在前一个连接处于Time wait状态下也可以被绑定。默认此选项未开启,客户端最好在建立连接之前设置此选项,服务端应通过serverSocket.setReuseAddress()设置此选项。如果前一个连接绑定了某个端口并且没有处于Time Wait状态,这个端口上是不能再创建新的socket连接的。

对于发送缓冲区和接收缓冲区,大多数情况下,除非网络在某个方向上负载过大,否则默认值就很合适。一般经验是除非检测到某个问题,否则不要进行调整。即使非要调整,可以在操作系统级增加允许的最大缓冲区的大小,与调整单个socket的缓冲区大小相比,前者可以得到更大的速度提升。

Server Socket创建及监听操作

调用new ServerSocket(PORT)之后服务端就开始在本地指定端口进行监听,当有连接请求到达时,若当前连接队列中连接数没有达到请求最大积压数量,则完成三次握手,将其加入到请求连接队列。一个连接被加入呼入连接请求队列之后,不管上层应用有没有通过accept方法将此连接从队列中取出,客户端通过连接给服务端发送的数据都不会丢失。源码分析如下:

public class ServerSocket implements java.io.Closeable {
    public ServerSocket(int port) throws IOException {
        this(port, 50, null);
    }

	//此构造函数中可以指定TCP呼入连接请求队列中积压的最大连接数
    public ServerSocket(int port, int backlog, InetAddress bindAddr) throws IOException {
		//设置实际的SocketImpl,与创建普通socket时操作完全相同
        setImpl();
        if (port < 0 || port > 0xFFFF)
            throw new IllegalArgumentException("Port value out of range: " + port);
        if (backlog < 1)
          backlog = 50;
        try {
            bind(new InetSocketAddress(bindAddr, port), backlog);
        } catch(SecurityException e) {
            close();
            throw e;
        } catch(IOException e) {
            close();
            throw e;
        }
    }
	
    private void setImpl() {
        if (factory != null) {
            impl = factory.createSocketImpl();
            checkOldImpl();
        } else {
            // No need to do a checkOldImpl() here, we know it's an up to date
            // SocketImpl!
            impl = new SocksSocketImpl();
        }
        if (impl != null)
            impl.setServerSocket(this);
    }
	
    public void bind(SocketAddress endpoint, int backlog) throws IOException {
        ...
        if (backlog < 1)
          backlog = 50;
        try {
            ...            
		    //实际调用的是socketImpl类的bind和listen操作
            getImpl().bind(epoint.getAddress(), epoint.getPort());
		    //实际最终调用的是DualStackPlainSocketImpl类的socketListen方法在指定端口开始进行socket连接请求的监听
            getImpl().listen(backlog);
            bound = true;
        } catch(...) {
		    ...
	    }
        ...
    }
	
    SocketImpl getImpl() throws SocketException {
		//检测当前socketImpl对应的文件描述符是否已经创建,如果没有,则创建
        if (!created)			
            createImpl();
        return impl;
    }

    void createImpl() throws SocketException {
        if (impl == null)
            setImpl();
        try {
			//调用的实际是AbstractPlainSocketImpl类的create方法为当前socketImpl创建文件描述符等资源
            impl.create(true);
            created = true;
        } catch (IOException e) {
            throw new SocketException(e.getMessage());
        }
    }
}

class DualStackPlainSocketImpl extends AbstractPlainSocketImpl {
    void socketListen(int backlog) throws IOException {
        int nativefd = checkAndReturnNativeFD();
        listen0(nativefd, backlog);
    }

    static native void listen0(int fd, int backlog) throws IOException;
}

Server Socket的accept操作

呼入连接请求队列:每个TCP连接的建立过程都是由操作系统内核中的TCP模块帮我们维护的,os会将三次握手后的TCP连接放入呼入连接请求队列中,我们在程序中通过serverSocket.accept()可以直接获得建立好的连接。应用层接收连接后此连接就会从呼入连接请求队列中取出。当应用层由于操作系统忙或者其他原因迟迟不对呼入连接请求队列中三次握手后的连接进行处理的话,就可能导致呼入连接请求队列满,造成的后果就是对于客户端发出的SYN建立TCP连接请求不会做出任何响应,之后客户端就会重发SYN连接数据包直到达到最大重传次数后向客户端返回TCP连接建立失败的错误。当一个TCP连接请求被加入到服务端连接请求队列之后,客户端就认为服务器进程已经准备好接收数据了,之后客户端向服务器发送数据,如果此时服务端并没有对连接请求队列中的这个请求进行处理,服务器的TCP仅将接收的数据放入此连接的接收数据缓冲队列。系统所允许的最大连接数主要取决于系统所能支持的最大线程或进程数,与呼入连接请求队列的大小无关。

accept方法实际上就是从呼入连接请求队列中取出连接并为其创建相应的socketImpl实现,并以ServerSocket对象中socketImpl对象含有的fd为基础为新创建的socket分配新的fd及其他资源的过程。由于socket中一些选项保存在fd中,所以在以serverSocket为基础创建新的socket时,这些选项的值也会被新的socket所继承,比如ReceiveBufferSize,SendBufferSize等。accept方法当呼入连接请求队列为空没有新的连接到来时会一直阻塞。accept方法是线程安全的。

public class ServerSocket implements java.io.Closeable {
    //accept方法实际上就是从呼入连接请求队列中取出连接并为其创建相应的socketImpl实现,分配新的fd的过程
    public Socket accept() throws IOException {
        ...
	    //先创建一个SocketImpl为null的socket实现
        Socket s = new Socket((SocketImpl) null);
        implAccept(s);
        return s;
    }

    protected final void implAccept(Socket s) throws IOException {
        SocketImpl si = null;
        try {
            if (s.impl == null)
              s.setImpl();
            else {
                s.impl.reset();
            }
            si = s.impl;
            s.impl = null;
            si.address = new InetAddress();
            si.fd = new FileDescriptor();
			//为何这里要设置一个中间变量si而不是直接将s.impl传递给accept方法?
			//实际调用的是PlainSocketImpl类的socketAccept方法
            getImpl().accept(si);

            SecurityManager security = System.getSecurityManager();
            if (security != null) {
                security.checkAccept(si.getInetAddress().getHostAddress(),
                                     si.getPort());
            }
        } catch (IOException e) {
            if (si != null)
                si.reset();
            s.impl = si;
            throw e;
        } catch (SecurityException e) {
            if (si != null)
                si.reset();
            s.impl = si;
            throw e;
        }
        s.impl = si;
        s.postAccept();
    }
}

class PlainSocketImpl extends AbstractPlainSocketImpl {
    //由于synchronized关键字,同一时刻只能有一个线程阻塞在等待请求队列中有新的连接到来
    protected synchronized void accept(SocketImpl s) throws IOException {
        if (s instanceof PlainSocketImpl) {
            // pass in the real impl not the wrapper.
			//传递的是要创建的socket的实际socketImpl对象,也就是DualStackPlainSocketImpl对象
            SocketImpl delegate = ((PlainSocketImpl)s).impl;
            delegate.address = new InetAddress();
            delegate.fd = new FileDescriptor();
			//win10操作系统中实际调用的是ServerSocket对象中存储的DualStackPlainSocketImpl对象的socketAccept方法
            impl.accept(delegate);
            // set fd to delegate's fd to be compatible with older releases
			//保证要创建的socket的SocksSocketImpl对象(即wrapper对象)与其对应的DualStackPlainSocketImpl对象指向同一个fd
            s.fd = delegate.fd;
        } else {
            impl.accept(s);
        }
    }
}

class DualStackPlainSocketImpl extends AbstractPlainSocketImpl {
    void socketAccept(SocketImpl s) throws IOException {
        int nativefd = checkAndReturnNativeFD();
        if (s == null)
            throw new NullPointerException("socket is null");
        int newfd = -1;
        InetSocketAddress[] isaa = new InetSocketAddress[1];
		//实际调用的是native方法accept0为到来的连接创建新的文件描述符等资源。
        if (timeout <= 0) {
			//以当前serverSocket的fd为参数传给accept0方法,为要创建的socket产生新的fd。fd中会包含一些选项,比如
			//ReceiveBufferSize,SendBufferSize等,创建的socket中的fd中包含的这些选项会继承serverSocket中的这些选项的值。
            newfd = accept0(nativefd, isaa);
        } else {
            configureBlocking(nativefd, false);
            try {
                waitForNewConnection(nativefd, timeout);
                newfd = accept0(nativefd, isaa);
                if (newfd != -1) {
                    configureBlocking(newfd, true);
                }
            } finally {
                configureBlocking(nativefd, true);
            }
        }
        /* Update (SocketImpl)s' fd */
        fdAccess.set(s.fd, newfd);
        /* Update socketImpls remote port, address and localport */
        InetSocketAddress isa = isaa[0];
		//设置到来连接的远端端口,远端ip,本地端口
        s.port = isa.getPort();
        s.address = isa.getAddress();
        s.localport = localport;//可见由serverSocket.accept方法创建的socket对象与serverSocket共用同一个端口。  
    }
	
    static native int accept0(int fd, InetSocketAddress[] isaa) throws IOException;
}

注意:serverSocket并不会为accept产生的socket连接分配新的端口,而是与serverSocket共用一个端口,这里就会有一个疑问,为什么服务端可以多个socket共用同一个端口,传输数据时如何区分?客户端能否使用同一个端口连接不同的服务端?

理论上来说,确定一条链路,只要五元组(源IP、源端口号、目标IP、目标端口号、协议)唯一就可以了,所以这不应该是技术限制。而实际上,Linux 3.9 之后确实可以让客户端使用相同的地址来连接不同的目标,只不过要提前跟内核说好而已。所以服务端可以多个socket共用一个端口的原因是这些socket的来源地址与端口号不同,客户端使用本地地址同一端口可以与不同服务器建立连接。

具体可参见:一个人也可以建立 TCP 连接呢,这篇文章中使用python在客户端相同端口进行了两次绑定,与不同的服务器建立了socket连接,java中不管是客户端还是服务端对于指定端口上的bind()方法调用只允许进行一次,有没有在指定端口多个socket同时绑定的API?

Server Socket的close操作

ServerSocket的关闭过程其实和socket的关闭过程是完全相同的,在serverSocket关闭时,所有阻塞在accept方法上的线程都会抛出SocketException。

ServerSocket相关选项设置及其作用

  • setSoTimeout(int timeout):设置accept操作超时时间,若timeout小于等于0,代表永不超时。accept操作阻塞超过超时时间之后会抛出异常java.net.SocketTimeoutException: Accept timed out,若程序中catch此异常,serverSocket之后还可以正常使用,即可以正常进行下一次的accept或其他操作。选项必须在调用accept方法之前设置才会起作用。
  • setReuseAddress(boolean on):与socket中此选项作用相同。
  • setReceiveBufferSize(int size):作用是给serverSocket创建的socket的receiveBuffer提供一个建议值,实际产生的socket的receivebuffer大小还需要通过Socket.getReceiveBufferSize()确定。若想设置receiveBuffer值大于64K,这个方法必须在serverSocket绑定到本地地址之前被调用,这也就是说必须通过无参构造函数创建,之后调用setReceiveBufferSize(int size)设置receiveBuffer,最后调用bind方法绑定到本地地址。

后记

在Socket与ServerSocket Java实现中,最重要的就是弄清SocketImpl及其子类之间的关系,在阅读源码过程中,很多实际的socket操作都是由JNI方式调用的c语言实现,由于个人水平不足,没有继续深入阅读,以后有机会可以尝试下。

(完)