Before reading this article, you need to have a basic understanding of sockets and custom protocols. You can first check the previous article "Custom Protocol Based on Java Socket to Implement Long Connection between Android and Server (I)" to learn the relevant basic knowledge points. [[178018]] 1. Protocol Definition In the previous article, we had a brief understanding of socket programming and custom protocols. This article will go deeper on this basis to realize the long connection between Android and the server. The protocol is defined as follows: - Data Protocol (Data)
- Length (length, 32 bits)
- Version number (version, 8 bits, the first 3 bits are reserved, the last 5 bits are used to indicate the actual version number)
- Data type (type, 8 bits, 0 indicates data)
- Service type (pattion, 8 bits, 0 means push, others are undetermined)
- Data format (dtype, 8bit, 0 means json, others are undetermined)
- Message id (msgId, 32 bits)
- Text data (data)
- Data ack protocol (DataAck)
- Length (length, 32 bits)
- Version number (version, 8 bits, the first 3 bits are reserved, the last 5 bits are used to indicate the actual version number)
- Data type (type, 8 bits, 1 indicates data ack)
- ack message id (ackMsgId, 32 bits)
- Reserved information (unused)
- Heartbeat protocol (ping)
- Length (length, 32 bits)
- Version number (version, 8 bits, the first 3 bits are reserved, the last 5 bits are used to indicate the actual version number)
- Data type (type, 8 bits, 2 for heartbeat)
- Heartbeat ID (pingId, 32 bits, odd numbers are reported by the client, i.e. 1, 3, 5..., and even numbers are sent by the server, i.e. 0, 2, 4...)
- Reserved information (unused)
- Heartbeat ack protocol (pingAck)
- Length (length, 32 bits)
- Version number (version, 8 bits, the first 3 bits are reserved, the last 5 bits are used to indicate the actual version number)
- Data type (type, 8 bits, 3 means heartbeat ack)
- ack heartbeat id (pingId, 32 bits, the client reports an odd number, i.e. 1, 3, 5..., and the server sends an even number, i.e. 0, 2, 4...)
- Reserved information (unused)
2. Protocol Implementation From the above protocol definitions, we can see that the four protocols have three common elements: length, version number, and data type. Then we can first abstract a basic protocol as follows: 1. Basic Protocol - import android.util.Log;
-
- import com.shandiangou.sdgprotocol.lib.Config;
- import com.shandiangou.sdgprotocol.lib.ProtocolException;
- import com.shandiangou.sdgprotocol.lib.SocketUtil;
-
- import java.io.ByteArrayOutputStream;
-
- /**
- * Created by meishan on 16/12/1.
- * <p>
- * Protocol type: 0 for data, 1 for dataAck, 2 for ping, 3 for pingAck
- */
- public abstract class BasicProtocol {
-
- // The length is in bytes.
- public static final int LENGTH_LEN = 4; //Record the length of the entire data length value
- protected static final int VER_LEN = 1; //Version length of the protocol (the first 3 bits are reserved, and the last 5 bits are the version number)
- protected static final int TYPE_LEN = 1; //Data type length of the protocol
-
- private int reserved = 0; //reserved information
- private int version = Config.VERSION; //version number
-
- /**
- * Get the length of the entire data
- * Unit: byte
- *
- * @return
- */
- protected int getLength() {
- return LENGTH_LEN + VER_LEN + TYPE_LEN;
- }
-
- public int getReserved() {
- return reserved;
- }
-
- public void setReserved( int reserved) {
- this.reserved = reserved;
- }
-
- public int getVersion() {
- return version;
- }
-
- public void setVersion( int version) {
- this.version = version;
- }
-
- /**
- * Get the protocol type, implemented by the subclass
- *
- * @return
- */
- public abstract int getProtocolType();
-
- /**
- * Calculate the byte[] value of the complete version number from the reserved value and the version number
- *
- * @return
- */
- private int getVer(byte r, byte v, int vLen) {
- int num = 0;
- int rLen = 8 - vLen;
- for ( int i = 0; i < rLen; i++) {
- num += (((r >> (rLen - 1 - i)) & 0x1) << (7 - i));
- }
- return num + v;
- }
-
- /**
- * Splice the sent data. Here, the protocol version, protocol type and data length are spliced. The specific content subclass will splice them again.
- * Splice in order
- *
- * @return
- */
- public byte[] genContentData() {
- byte[] length = SocketUtil.int2ByteArrays(getLength());
- byte reserved = (byte) getReserved();
- byte version = (byte) getVersion();
- byte[] ver = {(byte) getVer(reserved, version, 5)};
- byte[] type = {(byte) getProtocolType()};
-
- ByteArrayOutputStream baos = new ByteArrayOutputStream(LENGTH_LEN + VER_LEN + TYPE_LEN);
- baos.write(length, 0, LENGTH_LEN);
- baos.write(ver, 0, VER_LEN);
- baos.write(type, 0, TYPE_LEN);
- return baos.toByteArray();
- }
-
- /**
- * Parse the entire data length
- *
- * @param data
- * @return
- */
- protected int parseLength(byte[] data) {
- return SocketUtil.byteArrayToInt(data, 0, LENGTH_LEN);
- }
-
- /**
- * Parse the reserved position
- *
- * @param data
- * @return
- */
- protected int parseReserved(byte[] data) {
- byte r = data[LENGTH_LEN]; //The first 4 bytes (0, 1, 2, 3) are the int value of the data length, which together with the version number form a byte
- return (r >> 5) & 0xFF;
- }
-
- /**
- * Parse the version number
- *
- * @param data
- * @return
- */
- protected int parseVersion(byte[] data) {
- byte v = data[LENGTH_LEN]; //Combined with the reserved bit to form a byte
- return ((v << 3) & 0xFF) >> 3;
- }
-
- /**
- * Parse the protocol type
- *
- * @param data
- * @return
- */
- public static int parseType(byte[] data) {
- byte t = data[LENGTH_LEN + VER_LEN]; //The first 4 bytes (0, 1, 2, 3) are the int value of the data length, and ver occupies one byte
- return t & 0xFF;
- }
-
- /**
- * Parse the received data. Here, the protocol version, protocol type and data length are parsed. The specific content subclass will parse it again.
- *
- * @param data
- * @return
- * @throws ProtocolException If the protocol version is inconsistent, an exception is thrown
- */
- public int parseContentData(byte[] data) throws ProtocolException {
- int reserved = parseReserved(data);
- int version = parseVersion(data);
- int protocolType = parseType(data);
- if (version != getVersion()) {
- throw new ProtocolException( "input version is error: " + version);
- }
- return LENGTH_LEN + VER_LEN + TYPE_LEN;
- }
-
- @Override
- public String toString() {
- return "Version: " + getVersion() + ", Type: " + getProtocolType();
- }
- }
The Config class and SocketUtil class involved above are as follows: - /**
- * Created by meishan on 16/12/2.
- */
- public class Config {
-
- public static final int VERSION = 1; //Protocol version number
- public static final String ADDRESS = "10.17.64.237" ; //Server address
- public static final int PORT = 9013; //Server port number
-
- }
- import java.io.BufferedInputStream;
- import java.io.BufferedOutputStream;
- import java.io.IOException;
- import java.io.InputStream;
- import java.io.OutputStream;
- import java.nio.ByteBuffer;
- import java.util.HashMap;
- import java.util.Map;
-
- /**
- * Created by meishan on 16/12/1.
- */
- public class SocketUtil {
-
- private static Map< Integer , String> msgImp = new HashMap<>();
-
- static {
- msgImp.put(DataProtocol.PROTOCOL_TYPE, "com.shandiangou.sdgprotocol.lib.protocol.DataProtocol" ); //0
- msgImp.put(DataAckProtocol.PROTOCOL_TYPE, "com.shandiangou.sdgprotocol.lib.protocol.DataAckProtocol" ); //1
- msgImp.put(PingProtocol.PROTOCOL_TYPE, "com.shandiangou.sdgprotocol.lib.protocol.PingProtocol" ); //2
- msgImp.put(PingAckProtocol.PROTOCOL_TYPE, "com.shandiangou.sdgprotocol.lib.protocol.PingAckProtocol" ); //3
- }
-
- /**
- * Parse data content
- *
- * @param data
- * @return
- */
- public static BasicProtocol parseContentMsg(byte[] data) {
- int protocolType = BasicProtocol.parseType(data);
- String className = msgImp.get(protocolType);
- BasicProtocol basicProtocol;
- try {
- basicProtocol = (BasicProtocol) Class.forName(className).newInstance();
- basicProtocol.parseContentData(data);
- } catch (Exception e) {
- basicProtocol = null ;
- e.printStackTrace();
- }
- return basicProtocol;
- }
-
- /**
- * Read data
- *
- * @param inputStream
- * @return
- * @throws SocketExceptions
- */
- public static BasicProtocol readFromStream(InputStream inputStream) {
- BasicProtocol protocol;
- BufferedInputStream bis;
-
- //The header stores the length of the entire data, represented by 4 bytes. In the following write2Stream method, the header is written first
- byte[] header = new byte[BasicProtocol.LENGTH_LEN];
-
- try {
- bis = new BufferedInputStream(inputStream);
-
- int temp ;
- int len = 0;
- while (len < header.length) {
- temp = bis.read (header, len, header.length - len) ;
- if ( temp > 0) {
- len += temp ;
- } else if ( temp == -1) {
- bis.close ();
- return null ;
- }
- }
-
- len = 0;
- int length = byteArrayToInt(header); //data length
- byte[] content = new byte[length];
- while (len < length) {
- temp = bis.read (content, len, length - len) ;
-
- if ( temp > 0) {
- len += temp ;
- }
- }
-
- protocol = parseContentMsg(content);
- } catch (IOException e) {
- e.printStackTrace();
- return null ;
- }
-
- return protocol;
- }
-
- /**
- * Write data
- *
- * @param protocol
- * @param outputStream
- */
- public static void write2Stream(BasicProtocol protocol, OutputStream outputStream) {
- BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(outputStream);
- byte[] buffData = protocol.genContentData();
- byte[] header = int2ByteArrays(buffData.length);
- try {
- bufferedOutputStream.write(header);
- bufferedOutputStream.write(buffData);
- bufferedOutputStream.flush();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- /**
- * Close the input stream
- *
- * @param is
- */
- public static void closeInputStream(InputStream is ) {
- try {
- if ( is != null ) {
- is . close ();
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- /**
- * Close the output stream
- *
- * @param os
- */
- public static void closeOutputStream(OutputStream os) {
- try {
- if (os != null ) {
- os.close ();
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- public static byte[] int2ByteArrays( int i) {
- byte[] result = new byte[4];
- result[0] = (byte) ((i >> 24) & 0xFF);
- result[1] = (byte) ((i >> 16) & 0xFF);
- result[2] = (byte) ((i >> 8) & 0xFF);
- result[3] = (byte) (i & 0xFF);
- return result;
- }
-
- public static int byteArrayToInt(byte[] b) {
- int intValue = 0;
- for ( int i = 0; i < b.length; i++) {
- intValue += (b[i] & 0xFF) << (8 * (3 - i)); // int occupies 4 bytes (0, 1, 2, 3)
- }
- return intValue;
- }
-
- public static int byteArrayToInt(byte[] b, int byteOffset, int byteCount) {
- int intValue = 0;
- for ( int i = byteOffset; i < (byteOffset + byteCount); i++) {
- intValue += (b[i] & 0xFF) << (8 * (3 - (i - byteOffset)));
- }
- return intValue;
- }
-
- public static int bytes2Int(byte[] b, int byteOffset) {
- ByteBuffer byteBuffer = ByteBuffer.allocate( Integer . SIZE / Byte. SIZE );
- byteBuffer.put(b, byteOffset, 4); //occupies 4 bytes
- byteBuffer.flip();
- return byteBuffer.getInt();
- }
- }
Next we implement the specific protocol. 2. DataProtocol - import android.util.Log;
-
- import com.shandiangou.sdgprotocol.lib.ProtocolException;
- import com.shandiangou.sdgprotocol.lib.SocketUtil;
-
- import java.io.ByteArrayOutputStream;
- import java.io.Serializable ;
- import java.io.UnsupportedEncodingException;
-
- /**
- * Created by meishan on 16/12/1.
- */
- public class DataProtocol extends BasicProtocol implements Serializable {
-
- public static final int PROTOCOL_TYPE = 0;
-
- private static final int PATTION_LEN = 1;
- private static final int DTYPE_LEN = 1;
- private static final int MSGID_LEN = 4;
-
- private int pattion;
- private int dtype;
- private int msgId;
-
- private String data;
-
- @Override
- public int getLength() {
- return super.getLength() + PATTION_LEN + DTYPE_LEN + MSGID_LEN + data.getBytes().length;
- }
-
- @Override
- public int getProtocolType() {
- return PROTOCOL_TYPE;
- }
-
- public int getPattion() {
- return pattion;
- }
-
- public void setPattion( int pattion) {
- this.pattion = pattion;
- }
-
- public int getDtype() {
- return dtype;
- }
-
- public void setDtype( int dtype) {
- this.dtype = dtype;
- }
-
- public void setMsgId( int msgId) {
- this.msgId = msgId;
- }
-
- public int getMsgId() {
- return msgId;
- }
-
- public String getData() {
- return data;
- }
-
- public void setData(String data) {
- this.data = data;
- }
-
- /**
- * Splice and send data
- *
- * @return
- */
- @Override
- public byte[] genContentData() {
- byte[] base = super.genContentData();
- byte[] pattion = {(byte) this.pattion};
- byte[] dtype = {(byte) this.dtype};
- byte[] msgid = SocketUtil.int2ByteArrays(this.msgId);
- byte[] data = this.data.getBytes();
-
- ByteArrayOutputStream baos = new ByteArrayOutputStream(getLength());
- baos.write(base, 0, base.length); //protocol version + data type + data length + message id
- baos.write(pattion, 0, PATTION_LEN); //Business type
- baos.write(dtype, 0, DTYPE_LEN); //Business data format
- baos.write(msgid, 0, MSGID_LEN); //Message id
- baos.write(data, 0, data.length); //Business data
- return baos.toByteArray();
- }
-
- /**
- * Parse the received data in order
- *
- * @param data
- * @return
- * @throws ProtocolException
- */
- @Override
- public int parseContentData(byte[] data) throws ProtocolException {
- int pos = super.parseContentData(data);
-
- //Analysis of pattion
- pattion = data[pos] & 0xFF;
- pos += PATTION_LEN;
-
- //parse dtype
- dtype = data[pos] & 0xFF;
- pos += DTYPE_LEN;
-
- //Parse msgId
- msgId = SocketUtil.byteArrayToInt(data, pos, MSGID_LEN);
- pos += MSGID_LEN;
-
- //Parse data
- try {
- this.data = new String(data, pos, data.length - pos, "utf-8" );
- } catch (UnsupportedEncodingException e) {
- e.printStackTrace();
- }
-
- return pos;
- }
-
- @Override
- public String toString() {
- return "data: " + data;
- }
- }
3. DataAckProtocol - import com.shandiangou.sdgprotocol.lib.ProtocolException;
- import com.shandiangou.sdgprotocol.lib.SocketUtil;
-
- import java.io.ByteArrayOutputStream;
- import java.io.UnsupportedEncodingException;
-
- /**
- * Created by meishan on 16/12/1.
- */
- public class DataAckProtocol extends BasicProtocol {
-
- public static final int PROTOCOL_TYPE = 1;
-
- private static final int ACKMSGID_LEN = 4;
-
- private int ackMsgId;
-
- private String unused;
-
- @Override
- public int getLength() {
- return super.getLength() + ACKMSGID_LEN + unused.getBytes().length;
- }
-
- @Override
- public int getProtocolType() {
- return PROTOCOL_TYPE;
- }
-
- public int getAckMsgId() {
- return ackMsgId;
- }
-
- public void setAckMsgId( int ackMsgId) {
- this.ackMsgId = ackMsgId;
- }
-
- public String getUnused() {
- return unused;
- }
-
- public void setUnused(String unused) {
- this.unused = unused;
- }
-
- /**
- * Splice and send data
- *
- * @return
- */
- @Override
- public byte[] genContentData() {
- byte[] base = super.genContentData();
- byte[] ackMsgId = SocketUtil.int2ByteArrays(this.ackMsgId);
- byte[] unused = this.unused.getBytes();
-
- ByteArrayOutputStream baos = new ByteArrayOutputStream(getLength());
- baos.write(base, 0, base.length); //protocol version + data type + data length + message id
- baos.write(ackMsgId, 0, ACKMSGID_LEN); //Message id
- baos.write(unused, 0, unused.length); //unused
- return baos.toByteArray();
- }
-
- @Override
- public int parseContentData(byte[] data) throws ProtocolException {
- int pos = super.parseContentData(data);
-
- //Parse ackMsgId
- ackMsgId = SocketUtil.byteArrayToInt(data, pos, ACKMSGID_LEN);
- pos += ACKMSGID_LEN;
-
- //Parse unused
- try {
- unused = new String(data, pos, data.length - pos, "utf-8" );
- } catch (UnsupportedEncodingException e) {
- e.printStackTrace();
- }
-
- return pos;
- }
-
- }
4. PingProtocol - import com.shandiangou.sdgprotocol.lib.ProtocolException;
- import com.shandiangou.sdgprotocol.lib.SocketUtil;
-
- import java.io.ByteArrayOutputStream;
- import java.io.UnsupportedEncodingException;
-
- /**
- * Created by meishan on 16/12/1.
- */
- public class PingProtocol extends BasicProtocol {
-
- public static final int PROTOCOL_TYPE = 2;
-
- private static final int PINGID_LEN = 4;
-
- private int pingId;
-
- private String unused;
-
- @Override
- public int getLength() {
- return super.getLength() + PINGID_LEN + unused.getBytes().length;
- }
-
- @Override
- public int getProtocolType() {
- return PROTOCOL_TYPE;
- }
-
- public int getPingId() {
- return pingId;
- }
-
- public void setPingId( int pingId) {
- this.pingId = pingId;
- }
-
- public String getUnused() {
- return unused;
- }
-
- public void setUnused(String unused) {
- this.unused = unused;
- }
-
- /**
- * Splice and send data
- *
- * @return
- */
- @Override
- public byte[] genContentData() {
- byte[] base = super.genContentData();
- byte[] pingId = SocketUtil.int2ByteArrays(this.pingId);
- byte[] unused = this.unused.getBytes();
-
- ByteArrayOutputStream baos = new ByteArrayOutputStream(getLength());
- baos.write(base, 0, base.length); //protocol version + data type + data length + message id
- baos.write(pingId, 0, PINGID_LEN); //Message id
- baos.write(unused, 0, unused.length); //unused
- return baos.toByteArray();
- }
-
- @Override
- public int parseContentData(byte[] data) throws ProtocolException {
- int pos = super.parseContentData(data);
-
- //Parse pingId
- pingId = SocketUtil.byteArrayToInt(data, pos, PINGID_LEN);
- pos += PINGID_LEN;
-
- try {
- unused = new String(data, pos, data.length - pos, "utf-8" );
- } catch (UnsupportedEncodingException e) {
- e.printStackTrace();
- }
-
- return pos;
- }
-
- }
5. PingAckProtocol - import com.shandiangou.sdgprotocol.lib.ProtocolException;
- import com.shandiangou.sdgprotocol.lib.SocketUtil;
-
- import java.io.ByteArrayOutputStream;
- import java.io.UnsupportedEncodingException;
-
- /**
- * Created by meishan on 16/12/1.
- */
- public class PingAckProtocol extends BasicProtocol {
-
- public static final int PROTOCOL_TYPE = 3;
-
- private static final int ACKPINGID_LEN = 4;
-
- private int ackPingId;
-
- private String unused;
-
- @Override
- public int getLength() {
- return super.getLength() + ACKPINGID_LEN + unused.getBytes().length;
- }
-
- @Override
- public int getProtocolType() {
- return PROTOCOL_TYPE;
- }
-
- public int getAckPingId() {
- return ackPingId;
- }
-
- public void setAckPingId( int ackPingId) {
- this.ackPingId = ackPingId;
- }
-
- public String getUnused() {
- return unused;
- }
-
- public void setUnused(String unused) {
- this.unused = unused;
- }
-
- /**
- * Splice and send data
- *
- * @return
- */
- @Override
- public byte[] genContentData() {
- byte[] base = super.genContentData();
- byte[] ackPingId = SocketUtil.int2ByteArrays(this.ackPingId);
- byte[] unused = this.unused.getBytes();
-
- ByteArrayOutputStream baos = new ByteArrayOutputStream(getLength());
- baos.write(base, 0, base.length); //protocol version + data type + data length + message id
- baos.write(ackPingId, 0, ACKPINGID_LEN); //message id
- baos.write(unused, 0, unused.length); //unused
- return baos.toByteArray();
- }
-
- @Override
- public int parseContentData(byte[] data) throws ProtocolException {
- int pos = super.parseContentData(data);
-
- //Parse ackPingId
- ackPingId = SocketUtil.byteArrayToInt(data, pos, ACKPINGID_LEN);
- pos += ACKPINGID_LEN;
-
- //Parse unused
- try {
- unused = new String(data, pos, data.length - pos, "utf-8" );
- } catch (UnsupportedEncodingException e) {
- e.printStackTrace();
- }
-
- return pos;
- }
-
- }
3. Task Scheduling The above four protocols have been implemented. Next, we will use them to implement the communication between the app and the server. Here we use one thread to implement data sending, receiving and heartbeat respectively, as follows: 1. Client - import android.os.Handler;
- import android.os.Looper;
- import android.os.Message;
- import android.util.Log;
-
- import com.shandiangou.sdgprotocol.lib.protocol.BasicProtocol;
- import com.shandiangou.sdgprotocol.lib.protocol.DataProtocol;
- import com.shandiangou.sdgprotocol.lib.protocol.PingProtocol;
-
- import java.io.IOException;
- import java.io.InputStream;
- import java.io.OutputStream;
- import java.net.ConnectException;
- import java.net.Socket;
- import java.util.concurrent.ConcurrentLinkedQueue;
-
- import javax.net.SocketFactory;
-
- /**
- * Write data in an infinite loop, wait when there is no data, and notify when there is new message
- * <p>
- * Created by meishan on 16/12/1.
- */
- public class ClientRequestTask implements Runnable {
-
- private static final int SUCCESS = 100;
- private static final int FAILED = -1;
-
- private boolean isLongConnection = true ;
- private Handler mHandler;
- private SendTask mSendTask;
- private ReciveTask mReciveTask;
- private HeartBeatTask mHeartBeatTask;
- private Socket mSocket;
-
- private boolean isSocketAvailable;
- private boolean closeSendTask;
-
- protected volatile ConcurrentLinkedQueue<BasicProtocol> dataQueue = new ConcurrentLinkedQueue<>();
-
- public ClientRequestTask(RequestCallBack requestCallBacks) {
- mHandler = new MyHandler(requestCallBacks);
- }
-
- @Override
- public void run() {
- try {
- try {
- mSocket = SocketFactory.getDefault().createSocket(Config.ADDRESS, Config.PORT);
- // mSocket.setSoTimeout(10);
- } catch (ConnectException e) {
- failedMessage(-1, "Server connection abnormality, please check the network" );
- return ;
- }
-
- isSocketAvailable = true ;
-
- //Start receiving thread
- mReciveTask = new ReciveTask();
- mReciveTask.inputStream = mSocket.getInputStream();
- mReciveTask.start();
-
- //Start sending thread
- mSendTask = new SendTask();
- mSendTask.outputStream = mSocket.getOutputStream();
- mSendTask.start();
-
- //Start the heartbeat thread
- if (isLongConnection) {
- mHeartBeatTask = new HeartBeatTask();
- mHeartBeatTask.outputStream = mSocket.getOutputStream();
- mHeartBeatTask.start();
- }
- } catch (IOException e) {
- failedMessage(-1, "Network exception, please try again later" );
- e.printStackTrace();
- }
- }
-
- public void addRequest(DataProtocol data) {
- dataQueue.add (data);
- toNotifyAll(dataQueue); //If there is new data to be sent, wake up the sending thread
- }
-
- public synchronized void stop() {
-
- //Close the receiving thread
- closeReciveTask();
-
- //Close the sending thread
- closeSendTask = true ;
- toNotifyAll(dataQueue);
-
- //Close the heartbeat thread
- closeHeartBeatTask();
-
- //Close the socket
- closeSocket();
-
- // Clear data
- clearData();
-
- failedMessage(-1, "disconnected" );
- }
-
- /**
- * Close the receiving thread
- */
- private void closeReciveTask() {
- if (mReciveTask != null ) {
- mReciveTask.interrupt();
- mReciveTask.isCancle = true ;
- if (mReciveTask.inputStream != null ) {
- try {
- if (isSocketAvailable && !mSocket.isClosed() && mSocket.isConnected()) {
- mSocket.shutdownInput(); //To solve the java.net.SocketException problem, you need to shut downInput first
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- SocketUtil.closeInputStream(mReciveTask.inputStream);
- mReciveTask.inputStream = null ;
- }
- mReciveTask = null ;
- }
- }
-
- /**
- * Close the sending thread
- */
- private void closeSendTask() {
- if (mSendTask != null ) {
- mSendTask.isCancle = true ;
- mSendTask.interrupt();
- if (mSendTask.outputStream != null ) {
- synchronized (mSendTask.outputStream) {//Prevent stopping when writing data, stop after writing
- SocketUtil.closeOutputStream(mSendTask.outputStream);
- mSendTask.outputStream = null ;
- }
- }
- mSendTask = null ;
- }
- }
-
- /**
- * Close the heartbeat thread
- */
- private void closeHeartBeatTask() {
- if (mHeartBeatTask != null ) {
- mHeartBeatTask.isCancle = true ;
- if (mHeartBeatTask.outputStream != null ) {
- SocketUtil.closeOutputStream(mHeartBeatTask.outputStream);
- mHeartBeatTask.outputStream = null ;
- }
- mHeartBeatTask = null ;
- }
- }
-
- /**
- * Close the socket
- */
- private void closeSocket() {
- if (mSocket != null ) {
- try {
- mSocket.close ();
- isSocketAvailable = false ;
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
-
- /**
- * Clear data
- */
- private void clearData() {
- dataQueue.clear();
- isLongConnection = false ;
- }
-
- private void toWait(Object o) {
- synchronized (o) {
- try {
- o.wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
-
- /**
- * After notify() is called, the object lock is not released immediately, but after the corresponding synchronized(){} statement block is executed, the lock is automatically released.
- *
- * @param o
- */
- protected void toNotifyAll(Object o) {
- synchronized (o) {
- o.notifyAll();
- }
- }
-
- private void failedMessage( int code, String msg) {
- Message message = mHandler.obtainMessage(FAILED);
- message.what = FAILED;
- message.arg1 = code;
- message.obj = msg;
- mHandler.sendMessage(message);
- }
-
- private void successMessage(BasicProtocol protocol) {
- Message message = mHandler.obtainMessage(SUCCESS);
- message.what = SUCCESS;
- message.obj = protocol;
- mHandler.sendMessage(message);
- }
-
- private boolean isConnected() {
- if (mSocket.isClosed() || !mSocket.isConnected()) {
- ClientRequestTask.this.stop();
- return false ;
- }
- return true ;
- }
-
- /**
- * Server returns processing, the main thread runs
- */
- public class MyHandler extends Handler {
-
- private RequestCallBack mRequestCallBack;
-
- public MyHandler(RequestCallBack callBack) {
- super(Looper.getMainLooper());
- this.mRequestCallBack = callBack;
- }
-
- @Override
- public void handleMessage(Message msg) {
- super.handleMessage(msg);
- switch (msg.what) {
- case SUCCESS:
- mRequestCallBack.onSuccess((BasicProtocol) msg.obj);
- break;
- case FAILED:
- mRequestCallBack.onFailed(msg.arg1, (String) msg.obj);
- break;
- default :
- break;
- }
- }
- }
-
- /**
- * Data receiving thread
- */
- public class ReciveTask extends Thread {
-
- private boolean isCancle = false ;
- private InputStream inputStream;
-
- @Override
- public void run() {
- while (!isCancle) {
- if (!isConnected()) {
- break;
- }
-
- if (inputStream != null ) {
- BasicProtocol reciverData = SocketUtil.readFromStream(inputStream);
- if (reciverData != null ) {
- if (reciverData.getProtocolType() == 1 || reciverData.getProtocolType() == 3) {
- successMessage(reciverData);
- }
- } else {
- break;
- }
- }
- }
-
- SocketUtil.closeInputStream(inputStream); // Exit the input stream when the loop ends
- }
- }
-
- /**
- * Data sending thread
- * Make the thread wait when no data is sent
- */
- public class SendTask extends Thread {
-
- private boolean isCancle = false ;
- private OutputStream outputStream;
-
- @Override
- public void run() {
- while (!isCancle) {
- if (!isConnected()) {
- break;
- }
-
- BasicProtocol dataContent = dataQueue.poll();
- if (dataContent == null ) {
- toWait(dataQueue); //Wait if no data is sent
- if (closeSendTask) {
- closeSendTask(); //After notify() is called, the object lock is not released immediately, so the sending thread is interrupted here
- }
- } else if (outputStream != null ) {
- synchronized (outputStream) {
- SocketUtil.write2Stream(dataContent, outputStream);
- }
- }
- }
-
- SocketUtil.closeOutputStream(outputStream); //Exit the output stream when the loop ends
- }
- }
-
- /**
- * Heartbeat implementation, frequency 5 seconds
- * Created by meishan on 16/12/1.
- */
- public class HeartBeatTask extends Thread {
-
- private static final int REPEATTIME = 5000;
- private boolean isCancle = false ;
- private OutputStream outputStream;
- private int pingId;
-
- @Override
- public void run() {
- pingId = 1;
- while (!isCancle) {
- if (!isConnected()) {
- break;
- }
-
- try {
- mSocket.sendUrgentData(0xFF);
- } catch (IOException e) {
- isSocketAvailable = false ;
- ClientRequestTask.this.stop();
- break;
- }
-
- if (outputStream != null ) {
- PingProtocol pingProtocol = new PingProtocol();
- pingProtocol.setPingId(pingId);
- pingProtocol.setUnused( "ping..." );
- SocketUtil.write2Stream(pingProtocol, outputStream);
- pingId = pingId + 2;
- }
-
- try {
- Thread.sleep(REPEATTIME);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- SocketUtil.closeOutputStream(outputStream);
- }
- }
- }
The RequestCallBack interface involved is as follows: - /**
- * Created by meishan on 16/12/1.
- */
- public interface RequestCallBack {
-
- void onSuccess(BasicProtocol msg);
-
- void onFailed( int errorCode, String msg);
- }
2. Server - import java.io.DataInputStream;
- import java.io.DataOutputStream;
- import java.net.Socket;
- import java.util.Iterator;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.ConcurrentLinkedQueue;
-
- /**
- * Created by meishan on 16/12/1.
- */
- public class ServerResponseTask implements Runnable {
-
- private ReciveTask reciveTask;
- private SendTask sendTask;
- private Socket socket;
- private ResponseCallback tBack;
-
- private volatile ConcurrentLinkedQueue<BasicProtocol> dataQueue = new ConcurrentLinkedQueue<>();
- private static ConcurrentHashMap<String, Socket> onLineClient = new ConcurrentHashMap<>();
-
- private String userIP;
-
- public String getUserIP() {
- return userIP;
- }
-
- public ServerResponseTask(Socket socket, ResponseCallback tBack) {
- this.socket = socket;
- this.tBack = tBack;
- this.userIP = socket.getInetAddress().getHostAddress();
- System.out.println ( "User IP address: " + userIP);
- }
-
- @Override
- public void run() {
- try {
- //Start receiving thread
- reciveTask = new ReciveTask();
- reciveTask.inputStream = new DataInputStream(socket.getInputStream());
- reciveTask.start();
-
- //Start sending thread
- sendTask = new SendTask();
- sendTask.outputStream = new DataOutputStream(socket.getOutputStream());
- sendTask.start();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- public void stop() {
- if (reciveTask != null ) {
- reciveTask.isCancle = true ;
- reciveTask.interrupt();
- if (reciveTask.inputStream != null ) {
- SocketUtil.closeInputStream(reciveTask.inputStream);
- reciveTask.inputStream = null ;
- }
- reciveTask = null ;
- }
-
- if (sendTask != null ) {
- sendTask.isCancle = true ;
- sendTask.interrupt();
- if (sendTask.outputStream != null ) {
- synchronized (sendTask.outputStream) {//Prevent stopping when writing data, stop after writing
- sendTask.outputStream = null ;
- }
- }
- sendTask = null ;
- }
- }
-
- public void addMessage(BasicProtocol data) {
- if (!isConnected()) {
- return ;
- }
-
- dataQueue.offer(data);
- toNotifyAll(dataQueue); //If there is new data to be sent, wake up the sending thread
- }
-
- public Socket getConnectdClient(String clientID) {
- return onLineClient.get(clientID);
- }
-
- /**
- * Print the connected clients
- */
- public static void printAllClient() {
- if (onLineClient == null ) {
- return ;
- }
- Iterator<String> inter = onLineClient.keySet().iterator();
- while (inter.hasNext()) {
- System. out .println( "client:" + inter. next ());
- }
- }
-
- public void toWaitAll(Object o) {
- synchronized (o) {
- try {
- o.wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
-
- public void toNotifyAll(Object obj) {
- synchronized (obj) {
- obj.notifyAll();
- }
- }
-
- private boolean isConnected() {
- if (socket.isClosed() || !socket.isConnected()) {
- onLineClient.remove(userIP);
- ServerResponseTask.this.stop();
- System. out .println( "socket closed..." );
- return false ;
- }
- return true ;
- }
-
- public class ReciveTask extends Thread {
-
- private DataInputStream inputStream;
- private boolean isCancle;
-
- @Override
- public void run() {
- while (!isCancle) {
- if (!isConnected()) {
- isCancle = true ;
- break;
- }
-
- BasicProtocol clientData = SocketUtil.readFromStream(inputStream);
-
- if (clientData != null ) {
- if (clientData.getProtocolType() == 0) {
- System. out .println( "dtype: " + ((DataProtocol) clientData).getDtype() + ", pattion: " + ((DataProtocol) clientData).getPattion() + ", msgId: " + ((DataProtocol) clientData).getMsgId() + ", data: " + ((DataProtocol) clientData).getData());
-
- DataAckProtocol dataAck = new DataAckProtocol();
- dataAck.setUnused( "Received message: " + ((DataProtocol) clientData).getData());
- dataQueue.offer(dataAck);
- toNotifyAll(dataQueue); //Wake up the sending thread
-
- tBack.targetIsOnline(userIP);
- } else if (clientData.getProtocolType() == 2) {
- System. out .println( "pingId: " + ((PingProtocol) clientData).getPingId());
-
- PingAckProtocol pingAck = new PingAckProtocol();
- pingAck.setUnused( "heartbeat received" );
- dataQueue.offer(pingAck);
- toNotifyAll(dataQueue); //Wake up the sending thread
-
- tBack.targetIsOnline(userIP);
- }
- } else {
- System. out .println( "client is offline..." );
- break;
- }
- }
-
- SocketUtil.closeInputStream(inputStream);
- }
- }
-
- public class SendTask extends Thread {
-
- private DataOutputStream outputStream;
- private boolean isCancle;
-
- @Override
- public void run() {
- while (!isCancle) {
- if (!isConnected()) {
- isCancle = true ;
- break;
- }
-
- BasicProtocol procotol = dataQueue.poll();
- if (procotol == null ) {
- toWaitAll(dataQueue);
- } else if (outputStream != null ) {
- synchronized (outputStream) {
- SocketUtil.write2Stream(procotol, outputStream);
- }
- }
- }
-
- SocketUtil.closeOutputStream(outputStream);
- }
- }
The ResponseCallback interface involved is as follows: - /**
- * Created by meishan on 16/12/1.
- */
- public interface ResponseCallback {
-
- void targetIsOffline(DataProtocol reciveMsg);
-
- void targetIsOnline(String clientIp);
- }
The above code handles exceptions in several situations. For example, after the connection is established, the server stops running. At this time, the client's input stream is still blocked. How to ensure that the client does not throw an exception? These processes can be combined with the SocketUtil class. 4. Call Encapsulation 1. Client import com.shandiangou.sdgprotocol.lib.protocol.DataProtocol; - import com.shandiangou.sdgprotocol.lib.protocol.DataProtocol;
-
- /**
- * Created by meishan on 16/12/1.
- */
- public class ConnectionClient {
-
- private boolean isClosed;
-
- private ClientRequestTask mClientRequestTask;
-
- public ConnectionClient(RequestCallBack requestCallBack) {
- mClientRequestTask = new ClientRequestTask(requestCallBack);
- new Thread(mClientRequestTask).start();
- }
-
- public void addNewRequest(DataProtocol data) {
- if (mClientRequestTask != null && !isClosed)
- mClientRequestTask.addRequest(data);
- }
-
- public void closeConnect() {
- isClosed = true ;
- mClientRequestTask.stop();
- }
- }
2. Server - import com.shandiangou.sdgprotocol.lib.protocol.DataProtocol;
-
- import java.io.IOException;
- import java.net.ServerSocket;
- import java.net.Socket;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
-
- /**
- * Created by meishan on 16/12/1.
- */
- public class ConnectionServer {
-
- private static boolean isStart = true ;
- private static ServerResponseTask serverResponseTask;
-
- public ConnectionServer() {
-
- }
-
- public static void main(String[] args) {
-
- ServerSocket serverSocket = null ;
- ExecutorService executorService = Executors.newCachedThreadPool();
- try {
- serverSocket = new ServerSocket(Config.PORT);
- while (isStart) {
- Socket socket = serverSocket.accept();
- serverResponseTask = new ServerResponseTask(socket,
- new ResponseCallback() {
-
- @Override
- public void targetIsOffline(DataProtocol reciveMsg) {// The other party is not online
- if (reciveMsg != null ) {
- System. out .println(reciveMsg.getData());
- }
- }
-
- @Override
- public void targetIsOnline(String clientIp) {
- System. out .println(clientIp + " is onLine" );
- System. out .println( "-----------------------------------------" );
- }
- });
-
- if (socket.isConnected()) {
- executorService.execute (serverResponseTask);
- }
- }
-
- serverSocket.close () ;
-
- } catch (IOException e) {
- e.printStackTrace();
- finally
- if (serverSocket != null ) {
- try {
- isStart = false ;
- serverSocket.close () ;
- if (serverSocket != null )
- serverResponseTask.stop();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- }
- }
Summarize The key to implementing a custom protocol lies in the assembly and parsing of the protocol. The key code has been given above. If you need to view the complete code and demo, you can download the source code. Note: Run the main function of the server demo first, then check the IP address of the local machine, and then modify the IP address in Config.java in the client (android) code. Of course, make sure that the Android phone and the server are in the same local area, and then open the client. |