服务端:接收客户端发送的消息,并进行转发。
package socket.demo2;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
/**
* 聊天服务端
* @author 一池春水倾半城
* @date 2019/10/22
*/
public class Server {
private Selector selector;
// 人数统计、昵称和主机地址记录
private Map<String, String> users = new HashMap<>();
ByteBuffer buffer = ByteBuffer.allocate(2048);
public Server(int port) throws IOException {
// 开启服务端通道
ServerSocketChannel server = ServerSocketChannel.open();
// 监听端口
server.bind(new InetSocketAddress(port));
// 切换非阻塞模式
server.configureBlocking(false);
// 开启选择器
selector = Selector.open();
// 选择器注册到服务端通道上
server.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服务端启动...");
}
/**
* 通过监听选择键来监听客户端连接
* @throws IOException
*/
public void listen() throws IOException {
while(true) {
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
// 移除已处理的选择键
iterator.remove();
// 处理选择键
handle(key);
}
// 清空选择键
selector.selectedKeys().clear();
}
}
/**
* 处理选择键
* @param key
* @throws IOException
*/
private void handle(SelectionKey key) throws IOException {
ServerSocketChannel server;
SocketChannel client;
if (key.isAcceptable()) {
// 获取key对应的通道
server = (ServerSocketChannel) key.channel();
// 获取服务端连接
client = server.accept();
client.configureBlocking(false);
// 注册到选择器,指定行为是"读"
client.register(selector, SelectionKey.OP_READ);
System.out.println("接收到来自 " + client.getRemoteAddress() + " 的新连接!");
boardMsg("当前在线人数:" + users.size());
write("\n欢迎来到本聊天室,请输入昵称:", client);
key.interestOps(SelectionKey.OP_ACCEPT);
} else if (key.isReadable()) {
client = (SocketChannel) key.channel();
try {
String[] msg = rec(client).split("###");
if (msg.length == 1) { // 设置昵称
if (users.containsValue(msg[0])) {
write("昵称重复,请重新输入!", client);
} else {
users.put(client.getRemoteAddress().toString(), msg[0]);
write("hello " + msg[0], client);
}
} else if (msg.length == 2) {
System.out.println(client.getRemoteAddress() + " named " + msg[0] + " said to all: " + msg[1]);
boardMsg(msg[0] + "说:" + msg[1]);
} else if (msg.length == 3) {
System.out.println(client.getRemoteAddress() + " named " + msg[0] + " said to " + msg[2] + ": " + msg[1]);
p2pChat(msg[0] + "说:" + msg[1], msg[2], client);
}
} catch (Exception e) {
String address = client.getRemoteAddress().toString();
System.out.println(address + " 断开了连接!");
client.close();
String name = users.get(address);
users.remove(address);
boardMsg("用户 " + name + " 离开了!当前在线人数:" + users.size());
}
}
}
/**
* 读消息
* @param channel
* @return
* @throws IOException
*/
private String rec(SocketChannel channel) throws IOException {
buffer.clear();
int count = channel.read(buffer);
buffer.flip();
return new String(buffer.array(), 0, count, StandardCharsets.UTF_8);
}
/**
* 写消息
* @param msg
* @param channel
* @throws IOException
*/
private void write(String msg, SocketChannel channel) throws IOException {
buffer.clear();
buffer.put(msg.getBytes(StandardCharsets.UTF_8));
buffer.flip();
channel.write(buffer);
}
/**
* 分发消息给全部客户端,群聊
* @param msg
* @throws IOException
*/
private void boardMsg(String msg) throws IOException {
for (SelectionKey key:selector.keys()) {
Channel target = key.channel();
if (target.isOpen() && target instanceof SocketChannel) {
write(msg, (SocketChannel) target);
}
}
}
/**
* 发送消息给指定客户端,单聊
* @param msg
* @param targetName
* @param source
* @throws IOException
*/
private void p2pChat(String msg, String targetName, SocketChannel source) throws IOException {
boolean flag = false;
for (SelectionKey key:selector.keys()) {
Channel target = key.channel();
if (target.isOpen() && target instanceof SocketChannel) {
SocketChannel tar = (SocketChannel) target;
String name = users.get(tar.getRemoteAddress().toString());
if (name.equals(targetName)) {
write(msg, (SocketChannel) target);
write(msg, source);
flag = true;
break;
}
}
}
if (!flag) {
write("找不到该用户!", source);
}
}
public static void main(String[] args) throws IOException {
Server server = new Server(7777);
server.listen();
}
}
知识兔客户端:发送消息和读取消息
package socket.demo2;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;
/**
* @author 一池春水倾半城
* @date 2019/10/22
*/
public class Client {
static ByteBuffer buffer = ByteBuffer.allocate(1024);
// 记录昵称是否设置成功
volatile static boolean success = false;
// 用户昵称
volatile static String name = "sxh";
/**
* 读消息
* @param channel
* @return
* @throws IOException
*/
private static String rec(SocketChannel channel) throws IOException {
buffer.clear();
int count = channel.read(buffer);
buffer.flip();
return new String(buffer.array(), 0, count, StandardCharsets.UTF_8);
}
/**
* 写消息
* @param msg
* @param channel
* @throws IOException
*/
private static void write(String msg, SocketChannel channel) throws IOException {
buffer.clear();
buffer.put(msg.getBytes(StandardCharsets.UTF_8));
buffer.flip();
channel.write(buffer);
}
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1",7777));
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
// 开启新线程,从服务端读取消息
new Thread(() -> {
SocketChannel client = null;
while (true) {
try {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()) {
client = (SocketChannel) key.channel();
String msg = rec(client);
// 昵称设置成功
if (msg.contains("hello")) {
// 标识置为true
success = true;
name = msg.substring(6);
}
System.out.println(msg);
key.interestOps(SelectionKey.OP_READ);
}
}
selectionKeys.clear();
} catch (IOException e) {
if (client != null) {
try {
client.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
}
}).start();
// 主线程,用于写消息给服务端
Scanner scanner = new Scanner(System.in);
String tmp = "";
while (true) {
tmp = scanner.nextLine();
if (success) { // 昵称设置成功,开始聊天
// 单聊([消息]@[接收人])
if (tmp.contains("@")) {
tmp = tmp.replace("@", "###");
}
write(name + "###" + tmp, socketChannel);
} else { // 昵称尚未设置成功,继续设置
write(tmp, socketChannel);
}
}
}
}
知识兔