1
package basic.util.socket;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.FileChannel;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import basic.util.FileUtil;
import basic.util.log.Log;
public class BaseSocket {
protected static final String AccessChannelFailed = "{0} to {1}:{2} failed";
protected static final String ExceptionWrong = "Exception.Wrong";
protected static final int ReadTimeOut = 20;
// Allocate a byte buffer (4K)
public static final int BufferSize = 4 * 1024;
final private int port;
public BaseSocket(int port) {
this.port = port;
}
protected int getPort(){
return port;
}
protected static String readBufferData(ByteBuffer byteBuffer, int bytesRead) {
// Make the buffer ready to read
byteBuffer.flip();
// Convert the buffer into a line
byte[] lineBytes = new byte[bytesRead];
byteBuffer.get(lineBytes, 0, bytesRead);
byteBuffer.clear();
String line = new String(lineBytes);
return line;
}
protected static String readFromSocket(AsynchronousSocketChannel channel, ByteBuffer byteBuffer)
throws InterruptedException, ExecutionException, TimeoutException {
int bytesRead;
StringBuilder builder = new StringBuilder();
byteBuffer.clear();
bytesRead = channel.read(byteBuffer).get(ReadTimeOut, TimeUnit.SECONDS);
while (bytesRead != -1) {
String line = readBufferData(byteBuffer, bytesRead);
if (Command.Bye.getValue().equals(line)) {
break;
}
System.out.println("Message: " + line);
builder.append(line);
// Read the next line
bytesRead = channel.read(byteBuffer).get(ReadTimeOut, TimeUnit.SECONDS);
}
return builder.toString();
}
protected static void writeChannel(AsynchronousSocketChannel channel, String value) throws InterruptedException {
Future<Integer> future = channel.write( ByteBuffer.wrap(value.getBytes() ) );
while(!future.isDone()) {
Thread.sleep(1);
}
}
protected void closeChannel(AsynchronousSocketChannel channel) {
try {
// Close the connection if we need to
if (channel != null && channel.isOpen()) {
channel.close();
}
} catch (IOException e1) {
e1.printStackTrace();
}
}
protected static void readToFile(AsynchronousSocketChannel channel, ByteBuffer byteBuffer, String path)
throws InterruptedException, ExecutionException, TimeoutException, IOException,
FileNotFoundException {
File file = FileUtil.getFile(path);
int bytesRead;
try (FileOutputStream fos = new FileOutputStream(file);
FileChannel outChannel = fos.getChannel();) {
//byteBuffer.clear();
bytesRead = channel.read(byteBuffer).get(ReadTimeOut, TimeUnit.SECONDS);
while (bytesRead != -1) {
byteBuffer.flip();
byte[] lineBytes = new byte[bytesRead];
byteBuffer.get(lineBytes, 0, bytesRead);
String line = new String(lineBytes);
System.out.println("bytesRead:" + bytesRead);
System.out.println("data:" + line);
if (Command.Bye.getValue().equals(line)) {
break;
}
//outChannel.write(byteBuffer);
outChannel.write(ByteBuffer.wrap( lineBytes ));
byteBuffer.clear();
// Read the next line
bytesRead = channel.read(byteBuffer).get(ReadTimeOut, TimeUnit.SECONDS);
}
}
}
protected static void readFromFile(AsynchronousSocketChannel channel, ByteBuffer byteBuffer, String path)
throws InterruptedException {
File file = FileUtil.getFile(path);
int bytesRead;
try (FileInputStream fis = new FileInputStream(file);
FileChannel inChannel = fis.getChannel();) {
byteBuffer.clear();
bytesRead = inChannel.read(byteBuffer);
while (bytesRead != -1) {
byteBuffer.flip();
byte[] lineBytes = new byte[bytesRead];
byteBuffer.get(lineBytes, 0, bytesRead);
String line = new String(lineBytes);
System.out.println("data:" + line);
//channel.write(ByteBuffer.wrap(lineBytes));
writeChannel(channel, line);
byteBuffer.clear();
bytesRead = inChannel.read(byteBuffer);
}
} catch (IOException exception) {
Log.severe(exception);
}
}
}
2
package basic.util.socket;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import basic.util.exception.BaseException;
import basic.util.log.Log;
public class ServerSocket extends BaseSocket {
private AsynchronousServerSocketChannel listener;
public AsynchronousServerSocketChannel getListener() {
return listener;
}
public ServerSocket(int port) {
super(port);
}
public void run(ServerSocketHandler serverSocket) {
try {
// Create an AsynchronousServerSocketChannel that will listen on port 5000
listener = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(getPort()));
} catch (IOException exception) {
BaseException.throwException(exception);
}
// Listen for a new request
listener.accept(serverSocket, new CompletionHandler<AsynchronousSocketChannel, ServerSocketHandler>() {
@Override
public void completed(AsynchronousSocketChannel channel, ServerSocketHandler attachment) {
try {
Log.info("Connect from: " + channel.getRemoteAddress());
} catch (IOException e) {
Log.severe(e);
}
// Accept the next connection
listener.accept(null, this);
// Ready for the client
channel.write(ByteBuffer.wrap(Command.ready.getValue().getBytes()));
// Allocate a byte buffer (4K) to read from the client
ByteBuffer byteBuffer = ByteBuffer.allocate(BufferSize);
try {
// Read the first line
int bytesRead = channel.read(byteBuffer).get(ReadTimeOut, TimeUnit.SECONDS);
String cmd = readBufferData(byteBuffer, bytesRead);
Command.match(cmd).ifPresent(command -> command.serverAction(channel, byteBuffer, attachment));
/*if (Command.getServerData.getValue().equals(cmd)) {
readFromFile(channel, byteBuffer, attachment.getDataFilePath());
} else if (Command.sendServerResult.getValue().equals(cmd)) {
String content = readFromSocket(channel, byteBuffer);
FileUtil.appendJsonFile(content, attachment.getDoneResultFilePath());
}*/
} catch (InterruptedException | ExecutionException exception) {
Log.severe(exception);
} catch (TimeoutException exception) {
channel.write(ByteBuffer.wrap(Command.Bye.getValue().getBytes()));
Log.severe(exception);
}
System.out.println("End of conversation");
closeChannel(channel);
}
@Override
public void failed(Throwable exception, ServerSocketHandler handler) {
// Close this channel would cause AsynchronousCloseException
// directly return to avoid noise
if (exception instanceof AsynchronousCloseException) return;
Log.severe(exception);
}
});
}
}
3
package basic.util.socket;
import java.io.IOException;
import java.nio.channels.AsynchronousSocketChannel;
public class BaseClientSocket extends BaseSocket {
final private String host;
private AsynchronousSocketChannel channel;
public BaseClientSocket(String host, int port) {
super(port);
this.host = host;
}
protected String getHost() {
return host;
}
protected AsynchronousSocketChannel getChannel() {
return channel;
}
protected void openChannel() {
try {
channel = AsynchronousSocketChannel.open();
} catch (IOException e) {
e.printStackTrace();
}
}
protected void closeChannel() {
closeChannel(getChannel());
}
}
4
package basic.util.socket;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
import java.text.MessageFormat;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import basic.util.log.Log;
public class ClientSocket extends BaseClientSocket implements CompletionHandler<Void, ClientSocketHandler> {
private static final String RetrieveFromServer = "Button.RetrieveFromServer";
private Command command;
public ClientSocket(String serverHost, int port) {
super(serverHost, port);
}
public Command getCommand() {
return command;
}
public void run(ClientSocketHandler clientSocketHandler, Command command) {
this.command = command;
openChannel();
getChannel().connect(new InetSocketAddress(getHost(), getPort()), clientSocketHandler, this);
}
@Override
public void completed(Void result, ClientSocketHandler attachment) {
Log.info("Connected to: " + getHost() + " " + getPort());
ByteBuffer byteBuffer = ByteBuffer.allocate(BufferSize);
try {
// Read the first line
int bytesRead = getChannel().read(byteBuffer).get(ReadTimeOut, TimeUnit.SECONDS);
String readyCommand = readBufferData(byteBuffer, bytesRead);
if (Command.ready.getValue().equals(readyCommand)) {
writeChannel(getChannel(), command.getValue());
Command.match(command.getValue()).ifPresent(command -> command.clientAction(getChannel(), byteBuffer, attachment));
/*if (Command.getServerData.getValue().equals(command.getValue())) {
readToFile(getChannel(), byteBuffer, attachment.getData());
} else if (Command.sendServerResult.getValue().equals(command.getValue())) {
writeChannel(getChannel(), attachment.getData());
writeChannel(getChannel(), Command.Bye.getValue());
}*/
} else {
String msg = MessageFormat.format(ExceptionWrong, "Command", readyCommand);
attachment.retainThrowable(AccessChannelFailed, new Exception(msg), RetrieveFromServer, getHost(), Integer.toString(getPort()));
}
} catch (InterruptedException | ExecutionException exception) {
attachment.retainThrowable(AccessChannelFailed, exception, RetrieveFromServer, getHost(), Integer.toString(getPort()));
} catch (TimeoutException exception) {
getChannel().write( ByteBuffer.wrap(Command.Bye.getValue().getBytes() ) );
attachment.retainThrowable(AccessChannelFailed, exception, RetrieveFromServer, getHost(), Integer.toString(getPort()));
} /*catch (IOException exception) {
attachment.retainThrowable(BusinessException.Failed, BusinessException.ExceptionFailed, exception, "Write", "Data File", attachment.getData());
} */
attachment.setReady(true);
closeChannel();
}
@Override
public void failed(Throwable exception, ClientSocketHandler attachment) {
attachment.retainThrowable(AccessChannelFailed, exception, RetrieveFromServer, getHost(), Integer.toString(getPort()));
closeChannel();
attachment.setReady(true);
}
}