Sunday, October 11, 2020

Java Basic Util - NIO.2 - Socket Channel

 1

package basic.util.socket;


import java.io.File;

import java.io.FileInputStream;

import java.io.FileNotFoundException;

import java.io.FileOutputStream;

import java.io.IOException;

import java.nio.ByteBuffer;

import java.nio.channels.AsynchronousSocketChannel;

import java.nio.channels.FileChannel;

import java.util.concurrent.ExecutionException;

import java.util.concurrent.Future;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.TimeoutException;


import basic.util.FileUtil;

import basic.util.log.Log;


public class BaseSocket {

protected static final String AccessChannelFailed = "{0} to {1}:{2} failed";

protected static final String ExceptionWrong = "Exception.Wrong";

protected static final int ReadTimeOut = 20;


// Allocate a byte buffer (4K)

public static final int BufferSize = 4 * 1024;

final private int port;

public BaseSocket(int port) {

        this.port = port;

}

protected int getPort(){

return port;

}

protected static String readBufferData(ByteBuffer byteBuffer, int bytesRead) {

// Make the buffer ready to read

byteBuffer.flip();


// Convert the buffer into a line

byte[] lineBytes = new byte[bytesRead];

byteBuffer.get(lineBytes, 0, bytesRead);

byteBuffer.clear();

String line = new String(lineBytes);

return line;

}

protected static String readFromSocket(AsynchronousSocketChannel channel, ByteBuffer byteBuffer)

throws InterruptedException, ExecutionException, TimeoutException {

int bytesRead;

StringBuilder builder = new StringBuilder();

byteBuffer.clear();

bytesRead = channel.read(byteBuffer).get(ReadTimeOut, TimeUnit.SECONDS);

while (bytesRead != -1) {

String line = readBufferData(byteBuffer, bytesRead);


if (Command.Bye.getValue().equals(line)) {

break;

}

System.out.println("Message: " + line);

builder.append(line);

// Read the next line

bytesRead = channel.read(byteBuffer).get(ReadTimeOut, TimeUnit.SECONDS);

}

return builder.toString();

}

protected static void writeChannel(AsynchronousSocketChannel channel, String value) throws InterruptedException {

Future<Integer> future = channel.write( ByteBuffer.wrap(value.getBytes() ) );

while(!future.isDone()) {

    Thread.sleep(1);

}

}

protected void closeChannel(AsynchronousSocketChannel channel) {

try {

// Close the connection if we need to

if (channel != null && channel.isOpen()) {

channel.close();

}

} catch (IOException e1) {

e1.printStackTrace();

}

}

protected static void readToFile(AsynchronousSocketChannel channel, ByteBuffer byteBuffer, String path)

throws InterruptedException, ExecutionException, TimeoutException, IOException,

FileNotFoundException {

File file = FileUtil.getFile(path); 

int bytesRead;

try (FileOutputStream fos = new FileOutputStream(file);

FileChannel outChannel = fos.getChannel();) {


//byteBuffer.clear();

bytesRead = channel.read(byteBuffer).get(ReadTimeOut, TimeUnit.SECONDS);

while (bytesRead != -1) {

byteBuffer.flip();

byte[] lineBytes = new byte[bytesRead];

byteBuffer.get(lineBytes, 0, bytesRead);

String line = new String(lineBytes);

System.out.println("bytesRead:" + bytesRead); 

System.out.println("data:" + line);

if (Command.Bye.getValue().equals(line)) {

break;

}

//outChannel.write(byteBuffer);

outChannel.write(ByteBuffer.wrap( lineBytes ));

byteBuffer.clear();

// Read the next line

bytesRead = channel.read(byteBuffer).get(ReadTimeOut, TimeUnit.SECONDS);

}

}

}

protected static void readFromFile(AsynchronousSocketChannel channel, ByteBuffer byteBuffer, String path)

throws InterruptedException {

File file = FileUtil.getFile(path);

int bytesRead;

try (FileInputStream fis = new FileInputStream(file);

FileChannel inChannel = fis.getChannel();) {

byteBuffer.clear();

bytesRead = inChannel.read(byteBuffer);

while (bytesRead != -1) {

byteBuffer.flip();


byte[] lineBytes = new byte[bytesRead];

byteBuffer.get(lineBytes, 0, bytesRead);

String line = new String(lineBytes);

System.out.println("data:" + line);

//channel.write(ByteBuffer.wrap(lineBytes));

writeChannel(channel, line);


byteBuffer.clear();

bytesRead = inChannel.read(byteBuffer);

}

} catch (IOException exception) {

Log.severe(exception);

}

}

}

2
package basic.util.socket;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import basic.util.exception.BaseException;
import basic.util.log.Log;

public class ServerSocket extends BaseSocket {
private AsynchronousServerSocketChannel listener;
public AsynchronousServerSocketChannel getListener() {
return listener;
}
public ServerSocket(int port) {
super(port);
}

public void run(ServerSocketHandler serverSocket) {
try {
// Create an AsynchronousServerSocketChannel that will listen on port 5000
listener = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(getPort()));
} catch (IOException exception) {
BaseException.throwException(exception);
}
// Listen for a new request
listener.accept(serverSocket, new CompletionHandler<AsynchronousSocketChannel, ServerSocketHandler>() {

@Override
public void completed(AsynchronousSocketChannel channel, ServerSocketHandler attachment) {
try {
Log.info("Connect from: " + channel.getRemoteAddress());
} catch (IOException e) {
Log.severe(e);
}
// Accept the next connection
listener.accept(null, this);

// Ready for the client
channel.write(ByteBuffer.wrap(Command.ready.getValue().getBytes()));

// Allocate a byte buffer (4K) to read from the client
ByteBuffer byteBuffer = ByteBuffer.allocate(BufferSize);
try {
// Read the first line
int bytesRead = channel.read(byteBuffer).get(ReadTimeOut, TimeUnit.SECONDS);
String cmd = readBufferData(byteBuffer, bytesRead);
                    Command.match(cmd).ifPresent(command -> command.serverAction(channel, byteBuffer, attachment));
/*if (Command.getServerData.getValue().equals(cmd)) {
readFromFile(channel, byteBuffer, attachment.getDataFilePath());
} else if (Command.sendServerResult.getValue().equals(cmd)) {
String content = readFromSocket(channel, byteBuffer);
FileUtil.appendJsonFile(content, attachment.getDoneResultFilePath());
}*/
} catch (InterruptedException | ExecutionException exception) {
Log.severe(exception);
} catch (TimeoutException exception) {
channel.write(ByteBuffer.wrap(Command.Bye.getValue().getBytes()));
Log.severe(exception);
}

System.out.println("End of conversation");
closeChannel(channel);
}

@Override
public void failed(Throwable exception, ServerSocketHandler handler) {
// Close this channel would cause AsynchronousCloseException
// directly return to avoid noise
if (exception instanceof AsynchronousCloseException) return;
Log.severe(exception);
}
});
}
}

3
package basic.util.socket;

import java.io.IOException;
import java.nio.channels.AsynchronousSocketChannel;

public class BaseClientSocket extends BaseSocket {
final private String host;

private AsynchronousSocketChannel channel;
public BaseClientSocket(String host, int port) {
super(port);
this.host = host;
}

protected String getHost() {
return host;
}

protected AsynchronousSocketChannel getChannel() {
return channel;
}
protected void openChannel() {
try {
channel = AsynchronousSocketChannel.open();
} catch (IOException e) {
e.printStackTrace();
}
}
protected void closeChannel() {
closeChannel(getChannel());
}
}

4
package basic.util.socket;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
import java.text.MessageFormat;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import basic.util.log.Log;

public class ClientSocket extends BaseClientSocket implements CompletionHandler<Void, ClientSocketHandler> {
private static final String RetrieveFromServer = "Button.RetrieveFromServer";
private Command command;
public ClientSocket(String serverHost, int port) {
super(serverHost, port);
}
public Command getCommand() {
return command;
}

public void run(ClientSocketHandler clientSocketHandler, Command command) {
this.command = command;
openChannel();
getChannel().connect(new InetSocketAddress(getHost(), getPort()), clientSocketHandler, this);
}

@Override
public void completed(Void result, ClientSocketHandler attachment) {
Log.info("Connected to: " + getHost() + "  " + getPort());
ByteBuffer byteBuffer = ByteBuffer.allocate(BufferSize);

try {
// Read the first line
int bytesRead = getChannel().read(byteBuffer).get(ReadTimeOut, TimeUnit.SECONDS);

String readyCommand = readBufferData(byteBuffer, bytesRead);
if (Command.ready.getValue().equals(readyCommand)) {
writeChannel(getChannel(), command.getValue());
Command.match(command.getValue()).ifPresent(command -> command.clientAction(getChannel(), byteBuffer, attachment));
/*if (Command.getServerData.getValue().equals(command.getValue())) {
readToFile(getChannel(), byteBuffer, attachment.getData());
} else if (Command.sendServerResult.getValue().equals(command.getValue())) {
writeChannel(getChannel(), attachment.getData());
writeChannel(getChannel(), Command.Bye.getValue());
}*/
} else {
    String msg = MessageFormat.format(ExceptionWrong, "Command", readyCommand);
attachment.retainThrowable(AccessChannelFailed, new Exception(msg), RetrieveFromServer, getHost(), Integer.toString(getPort()));
}
} catch (InterruptedException | ExecutionException exception) {
attachment.retainThrowable(AccessChannelFailed, exception, RetrieveFromServer, getHost(), Integer.toString(getPort()));
} catch (TimeoutException exception) {
getChannel().write( ByteBuffer.wrap(Command.Bye.getValue().getBytes() ) );
attachment.retainThrowable(AccessChannelFailed, exception, RetrieveFromServer, getHost(), Integer.toString(getPort()));
} /*catch (IOException exception) {
attachment.retainThrowable(BusinessException.Failed, BusinessException.ExceptionFailed, exception, "Write", "Data File", attachment.getData());
} */

attachment.setReady(true);
closeChannel();
}
@Override
public void failed(Throwable exception, ClientSocketHandler attachment) {
attachment.retainThrowable(AccessChannelFailed, exception, RetrieveFromServer, getHost(), Integer.toString(getPort()));
closeChannel();
attachment.setReady(true);
}
}

No comments:

Post a Comment